1 /*
2    Copyright (c) 2000, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "ha_ndbcluster_glue.h"
26 #include "ha_ndbcluster.h"
27 #include "ha_ndbcluster_connection.h"
28 #include "ndb_local_connection.h"
29 #include "ndb_thd.h"
30 #include "ndb_table_guard.h"
31 #include "ndb_global_schema_lock.h"
32 #include "ndb_global_schema_lock_guard.h"
33 #include "ndb_tdc.h"
34 #include "ndb_name_util.h"
35 
36 #include "rpl_injector.h"
37 #include "rpl_filter.h"
38 #if MYSQL_VERSION_ID > 50600
39 #include "rpl_slave.h"
40 #else
41 #include "slave.h"
42 #include "log_event.h"
43 #endif
44 #include "binlog.h"
45 #include "ha_ndbcluster_binlog.h"
46 #include <ndbapi/NdbDictionary.hpp>
47 #include <ndbapi/ndb_cluster_connection.hpp>
48 #include "mysqld_thd_manager.h"  // Global_THD_manager
49 
50 extern my_bool opt_ndb_log_orig;
51 extern my_bool opt_ndb_log_bin;
52 extern my_bool opt_ndb_log_update_as_write;
53 extern my_bool opt_ndb_log_updated_only;
54 extern my_bool opt_ndb_log_update_minimal;
55 extern my_bool opt_ndb_log_binlog_index;
56 extern my_bool opt_ndb_log_apply_status;
57 extern ulong opt_ndb_extra_logging;
58 extern st_ndb_slave_state g_ndb_slave_state;
59 extern my_bool opt_ndb_log_transaction_id;
60 extern my_bool log_bin_use_v1_row_events;
61 extern my_bool opt_ndb_log_empty_update;
62 
63 bool ndb_log_empty_epochs(void);
64 
65 void ndb_index_stat_restart();
66 
67 /*
68   defines for cluster replication table names
69 */
70 #include "ha_ndbcluster_tables.h"
71 
72 #include "ndb_dist_priv_util.h"
73 #include "ndb_anyvalue.h"
74 #include "ndb_binlog_extra_row_info.h"
75 #include "ndb_event_data.h"
76 #include "ndb_schema_object.h"
77 #include "ndb_schema_dist.h"
78 #include "ndb_repl_tab.h"
79 #include "ndb_binlog_thread.h"
80 #include "ndb_find_files_list.h"
81 
82 /*
83   Timeout for syncing schema events between
84   mysql servers, and between mysql server and the binlog
85 */
86 static const int DEFAULT_SYNC_TIMEOUT= 120;
87 
88 /* Column numbers in the ndb_binlog_index table */
89 enum Ndb_binlog_index_cols
90 {
91   NBICOL_START_POS                 = 0
92   ,NBICOL_START_FILE               = 1
93   ,NBICOL_EPOCH                    = 2
94   ,NBICOL_NUM_INSERTS              = 3
95   ,NBICOL_NUM_UPDATES              = 4
96   ,NBICOL_NUM_DELETES              = 5
97   ,NBICOL_NUM_SCHEMAOPS            = 6
98   /* Following colums in schema 'v2' */
99   ,NBICOL_ORIG_SERVERID            = 7
100   ,NBICOL_ORIG_EPOCH               = 8
101   ,NBICOL_GCI                      = 9
102   /* Following columns in schema 'v3' */
103   ,NBICOL_NEXT_POS                 = 10
104   ,NBICOL_NEXT_FILE                = 11
105 };
106 
107 /*
108   Flag showing if the ndb binlog should be created, if so == TRUE
109   FALSE if not
110 */
111 my_bool ndb_binlog_running= FALSE;
112 static my_bool ndb_binlog_tables_inited= FALSE;
113 static my_bool ndb_binlog_is_ready= FALSE;
114 
115 bool
ndb_binlog_is_read_only(void)116 ndb_binlog_is_read_only(void)
117 {
118   if(!ndb_binlog_tables_inited)
119   {
120     /* the ndb_* system tables not setup yet */
121     return true;
122   }
123 
124   if (ndb_binlog_running && !ndb_binlog_is_ready)
125   {
126     /*
127       The binlog thread is supposed to write to binlog
128       but not ready (still initializing or has lost connection)
129     */
130     return true;
131   }
132   return false;
133 }
134 
135 /*
136   Global reference to the ndb injector thread THD oject
137 
138   Has one sole purpose, for setting the in_use table member variable
139   in get_share(...)
140 */
141 extern THD * injector_thd; // Declared in ha_ndbcluster.cc
142 
143 /*
144   Global reference to ndb injector thd object.
145 
146   Used mainly by the binlog index thread, but exposed to the client sql
147   thread for one reason; to setup the events operations for a table
148   to enable ndb injector thread receiving events.
149 
150   Must therefore always be used with a surrounding
151   native_mutex_lock(&injector_mutex), when doing create/dropEventOperation
152 */
153 static Ndb *injector_ndb= 0;
154 static Ndb *schema_ndb= 0;
155 
156 static int ndbcluster_binlog_inited= 0;
157 
158 /*
159   Mutex and condition used for interacting between client sql thread
160   and injector thread
161 */
162 static native_mutex_t injector_mutex;
163 static native_cond_t  injector_cond;
164 
165 /* NDB Injector thread (used for binlog creation) */
166 static ulonglong ndb_latest_applied_binlog_epoch= 0;
167 static ulonglong ndb_latest_handled_binlog_epoch= 0;
168 static ulonglong ndb_latest_received_binlog_epoch= 0;
169 
170 NDB_SHARE *ndb_apply_status_share= 0;
171 NDB_SHARE *ndb_schema_share= 0;
172 static native_mutex_t ndb_schema_share_mutex;
173 
174 extern my_bool opt_log_slave_updates;
175 static my_bool g_ndb_log_slave_updates;
176 
177 static bool g_injector_v1_warning_emitted = false;
178 
179 #ifndef NDEBUG
print_records(TABLE * table,const uchar * record)180 static void print_records(TABLE *table, const uchar *record)
181 {
182   for (uint j= 0; j < table->s->fields; j++)
183   {
184     char buf[40];
185     int pos= 0;
186     Field *field= table->field[j];
187     const uchar* field_ptr= field->ptr - table->record[0] + record;
188     int pack_len= field->pack_length();
189     int n= pack_len < 10 ? pack_len : 10;
190 
191     for (int i= 0; i < n && pos < 20; i++)
192     {
193       pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
194     }
195     buf[pos]= 0;
196     DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
197   }
198 }
199 #else
200 #define print_records(a,b)
201 #endif
202 
203 
204 #ifndef NDEBUG
dbug_print_table(const char * info,TABLE * table)205 static void dbug_print_table(const char *info, TABLE *table)
206 {
207   if (table == 0)
208   {
209     DBUG_PRINT("info",("%s: (null)", info));
210     return;
211   }
212   DBUG_PRINT("info",
213              ("%s: %s.%s s->fields: %d  "
214               "reclength: %lu  rec_buff_length: %u  record[0]: 0x%lx  "
215               "record[1]: 0x%lx",
216               info,
217               table->s->db.str,
218               table->s->table_name.str,
219               table->s->fields,
220               table->s->reclength,
221               table->s->rec_buff_length,
222               (long) table->record[0],
223               (long) table->record[1]));
224 
225   for (unsigned int i= 0; i < table->s->fields; i++)
226   {
227     Field *f= table->field[i];
228     DBUG_PRINT("info",
229                ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d  pack_length: %d  "
230                 "ptr: 0x%lx[+%d]  null_bit: %u  null_ptr: 0x%lx[+%d]",
231                 i,
232                 f->field_name,
233                 (long) f->flags,
234                 (f->flags & PRI_KEY_FLAG)  ? "pri"       : "attr",
235                 (f->flags & NOT_NULL_FLAG) ? ""          : ",nullable",
236                 (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
237                 (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
238                 (f->flags & BLOB_FLAG)     ? ",blob"     : "",
239                 (f->flags & BINARY_FLAG)   ? ",binary"   : "",
240                 f->real_type(),
241                 f->pack_length(),
242                 (long) f->ptr, (int) (f->ptr - table->record[0]),
243                 f->null_bit,
244                 (long) f->null_offset(0),
245                 (int) f->null_offset()));
246     if (f->type() == MYSQL_TYPE_BIT)
247     {
248       Field_bit *g= (Field_bit*) f;
249       DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d  bit_ptr: 0x%lx[+%d] "
250                                    "bit_ofs: %d  bit_len: %u",
251                                    g->field_length, (long) g->bit_ptr,
252                                    (int) ((uchar*) g->bit_ptr -
253                                           table->record[0]),
254                                    g->bit_ofs, g->bit_len));
255     }
256   }
257 }
258 #else
259 #define dbug_print_table(a,b)
260 #endif
261 
262 
run_query(THD * thd,char * buf,char * end,const int * no_print_error)263 static void run_query(THD *thd, char *buf, char *end,
264                       const int *no_print_error)
265 {
266   /*
267     NOTE! Don't use this function for new implementation, backward
268     compat. only
269   */
270 
271   Ndb_local_connection mysqld(thd);
272 
273   /*
274     Run the query, suppress some errors from being printed
275     to log and ignore any error returned
276   */
277   (void)mysqld.raw_run_query(buf, (end - buf),
278                              no_print_error);
279 }
280 
281 static void
ndb_binlog_close_shadow_table(NDB_SHARE * share)282 ndb_binlog_close_shadow_table(NDB_SHARE *share)
283 {
284   DBUG_ENTER("ndb_binlog_close_shadow_table");
285   Ndb_event_data *event_data= share->event_data;
286   if (event_data)
287   {
288     delete event_data;
289     share->event_data= 0;
290   }
291   DBUG_VOID_RETURN;
292 }
293 
294 
295 /*
296   Open a shadow table for the table given in share.
297   - The shadow table is (mainly) used when an event is
298     received from the data nodes which need to be written
299     to the binlog injector.
300 */
301 
302 static int
ndb_binlog_open_shadow_table(THD * thd,NDB_SHARE * share)303 ndb_binlog_open_shadow_table(THD *thd, NDB_SHARE *share)
304 {
305   int error;
306   assert(share->event_data == 0);
307   Ndb_event_data *event_data= share->event_data= new Ndb_event_data(share);
308   DBUG_ENTER("ndb_binlog_open_shadow_table");
309 
310   MEM_ROOT **root_ptr= my_thread_get_THR_MALLOC();
311   MEM_ROOT *old_root= *root_ptr;
312   init_sql_alloc(PSI_INSTRUMENT_ME, &event_data->mem_root, 1024, 0);
313   *root_ptr= &event_data->mem_root;
314 
315   TABLE_SHARE *shadow_table_share=
316     (TABLE_SHARE*)alloc_root(&event_data->mem_root, sizeof(TABLE_SHARE));
317   TABLE *shadow_table=
318     (TABLE*)alloc_root(&event_data->mem_root, sizeof(TABLE));
319 
320   init_tmp_table_share(thd, shadow_table_share,
321                        share->db, 0,
322                        share->table_name,
323                        share->key_string());
324   if ((error= open_table_def(thd, shadow_table_share, 0)) ||
325       (error= open_table_from_share(thd, shadow_table_share, "", 0,
326                                     (uint) (OPEN_FRM_FILE_ONLY | DELAYED_OPEN | READ_ALL),
327                                     0, shadow_table,
328                                     false
329                                     )))
330   {
331     DBUG_PRINT("error", ("failed to open shadow table, error: %d my_errno: %d",
332                          error, my_errno()));
333     free_table_share(shadow_table_share);
334     delete event_data;
335     share->event_data= 0;
336     *root_ptr= old_root;
337     DBUG_RETURN(error);
338   }
339   event_data->shadow_table= shadow_table;
340 
341   mysql_mutex_lock(&LOCK_open);
342   assign_new_table_id(shadow_table_share);
343   mysql_mutex_unlock(&LOCK_open);
344 
345   shadow_table->in_use= injector_thd;
346 
347 
348   // Allocate strings for db and table_name for shadow_table
349   // in event_data's MEM_ROOT(where the shadow_table itself is allocated)
350   lex_string_copy(&event_data->mem_root,
351                   &shadow_table->s->db,
352                   share->db);
353   lex_string_copy(&event_data->mem_root,
354                   &shadow_table->s->table_name,
355                   share->table_name);
356 
357   /* We can't use 'use_all_columns()' as the file object is not setup yet */
358   shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
359                                              &shadow_table->s->all_set);
360 
361   if (shadow_table->s->primary_key == MAX_KEY)
362    share->flags|= NSF_HIDDEN_PK;
363 
364   if (shadow_table->s->blob_fields != 0)
365     share->flags|= NSF_BLOB_FLAG;
366 
367   event_data->init_pk_bitmap();
368 
369 #ifndef NDEBUG
370   dbug_print_table("table", shadow_table);
371 #endif
372   *root_ptr= old_root;
373   DBUG_RETURN(0);
374 }
375 
376 
377 /*
378   Initialize the binlog part of the NDB_SHARE
379 */
ndbcluster_binlog_init_share(THD * thd,NDB_SHARE * share,TABLE * _table)380 int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table)
381 {
382   DBUG_ENTER("ndbcluster_binlog_init_share");
383 
384   if (!share->need_events(ndb_binlog_running))
385   {
386     if (_table)
387     {
388       if (_table->s->primary_key == MAX_KEY)
389         share->flags|= NSF_HIDDEN_PK;
390       if (_table->s->blob_fields != 0)
391         share->flags|= NSF_BLOB_FLAG;
392     }
393     else
394     {
395       share->flags|= NSF_NO_BINLOG;
396     }
397     DBUG_RETURN(0);
398   }
399 
400   DBUG_RETURN(ndb_binlog_open_shadow_table(thd, share));
401 }
402 
403 static int
get_ndb_blobs_value(TABLE * table,NdbValue * value_array,uchar * & buffer,uint & buffer_size,my_ptrdiff_t ptrdiff)404 get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
405                     uchar*& buffer, uint& buffer_size,
406                     my_ptrdiff_t ptrdiff)
407 {
408   DBUG_ENTER("get_ndb_blobs_value");
409 
410   // Field has no field number so cannot use TABLE blob_field
411   // Loop twice, first only counting total buffer size
412   for (int loop= 0; loop <= 1; loop++)
413   {
414     uint32 offset= 0;
415     for (uint i= 0; i < table->s->fields; i++)
416     {
417       Field *field= table->field[i];
418       NdbValue value= value_array[i];
419       if (! (field->flags & BLOB_FLAG))
420         continue;
421       if (value.blob == NULL)
422       {
423         DBUG_PRINT("info",("[%u] skipped", i));
424         continue;
425       }
426       Field_blob *field_blob= (Field_blob *)field;
427       NdbBlob *ndb_blob= value.blob;
428       int isNull;
429       if (ndb_blob->getNull(isNull) != 0)
430         DBUG_RETURN(-1);
431       if (isNull == 0) {
432         Uint64 len64= 0;
433         if (ndb_blob->getLength(len64) != 0)
434           DBUG_RETURN(-1);
435         // Align to Uint64
436         uint32 size= Uint32(len64);
437         if (size % 8 != 0)
438           size+= 8 - size % 8;
439         if (loop == 1)
440         {
441           uchar *buf= buffer + offset;
442           uint32 len= 0xffffffff;  // Max uint32
443           if (ndb_blob->readData(buf, len) != 0)
444             DBUG_RETURN(-1);
445           DBUG_PRINT("info", ("[%u] offset: %u  buf: 0x%lx  len=%u  [ptrdiff=%d]",
446                               i, offset, (long) buf, len, (int)ptrdiff));
447           assert(len == len64);
448           // Ugly hack assumes only ptr needs to be changed
449           field_blob->set_ptr_offset(ptrdiff, len, buf);
450         }
451         offset+= size;
452       }
453       else if (loop == 1) // undefined or null
454       {
455         // have to set length even in this case
456         uchar *buf= buffer + offset; // or maybe NULL
457         uint32 len= 0;
458         field_blob->set_ptr_offset(ptrdiff, len, buf);
459         DBUG_PRINT("info", ("[%u] isNull=%d", i, isNull));
460         }
461     }
462     if (loop == 0 && offset > buffer_size)
463     {
464       my_free(buffer);
465       buffer_size= 0;
466       DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
467       buffer= (uchar*) my_malloc(PSI_INSTRUMENT_ME, offset, MYF(MY_WME));
468       if (buffer == NULL)
469       {
470         sql_print_error("get_ndb_blobs_value: my_malloc(%u) failed", offset);
471         DBUG_RETURN(-1);
472       }
473       buffer_size= offset;
474     }
475   }
476   DBUG_RETURN(0);
477 }
478 
479 
480 /*****************************************************************
481   functions called from master sql client threads
482 ****************************************************************/
483 
484 /*
485   called in mysql_show_binlog_events and reset_logs to make sure we wait for
486   all events originating from the 'thd' to arrive in the binlog.
487 
488   'thd' is expected to be non-NULL.
489 
490   Wait for the epoch in which the last transaction of the 'thd' is a part of.
491 
492   Wait a maximum of 30 seconds.
493 */
ndbcluster_binlog_wait(THD * thd)494 static void ndbcluster_binlog_wait(THD *thd)
495 {
496   if (ndb_binlog_running)
497   {
498     DBUG_ENTER("ndbcluster_binlog_wait");
499     assert(thd);
500     assert(thd_sql_command(thd) == SQLCOM_SHOW_BINLOG_EVENTS ||
501            thd_sql_command(thd) == SQLCOM_FLUSH ||
502            thd_sql_command(thd) == SQLCOM_RESET);
503     /*
504       Binlog Injector should not wait for itself
505     */
506     if (thd->system_thread == SYSTEM_THREAD_NDBCLUSTER_BINLOG)
507       DBUG_VOID_RETURN;
508 
509     Thd_ndb *thd_ndb = get_thd_ndb(thd);
510     if (!thd_ndb)
511     {
512       /*
513        thd has not interfaced with ndb before
514        so there is no need for waiting
515       */
516        DBUG_VOID_RETURN;
517     }
518 
519     const char *save_info = thd->proc_info;
520     thd->proc_info = "Waiting for ndbcluster binlog update to "
521 	"reach current position";
522 
523     const Uint64 start_handled_epoch = ndb_latest_handled_binlog_epoch;
524    /*
525      Highest epoch that a transaction against Ndb has received
526      as part of commit processing *in this thread*. This is a
527      per-session 'most recent change' indicator.
528     */
529     const Uint64 session_last_committed_epoch =
530       thd_ndb->m_last_commit_epoch_session;
531 
532     /*
533      * Wait until the last committed epoch from the session enters Binlog.
534      * Break any possible deadlock after 30s.
535      */
536     int count = 30;
537 
538     native_mutex_lock(&injector_mutex);
539     while (!thd->killed && count && ndb_binlog_running &&
540            (ndb_latest_handled_binlog_epoch == 0 ||
541             ndb_latest_handled_binlog_epoch < session_last_committed_epoch))
542     {
543       count--;
544       struct timespec abstime;
545       set_timespec(&abstime, 1);
546       native_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
547     }
548     native_mutex_unlock(&injector_mutex);
549 
550     if (count == 0)
551     {
552       sql_print_warning("NDB: Thread id %u timed out (30s) waiting for epoch %u/%u "
553                         "to be handled.  Progress : %u/%u -> %u/%u.",
554                         thd->thread_id(),
555                         Uint32((session_last_committed_epoch >> 32) & 0xffffffff),
556                         Uint32(session_last_committed_epoch & 0xffffffff),
557                         Uint32((start_handled_epoch >> 32) & 0xffffffff),
558                         Uint32(start_handled_epoch & 0xffffffff),
559                         Uint32((ndb_latest_handled_binlog_epoch >> 32) & 0xffffffff),
560                         Uint32(ndb_latest_handled_binlog_epoch & 0xffffffff));
561 
562       // Fail on wait/deadlock timeout in debug compile
563       assert(false);
564     }
565 
566     thd->proc_info= save_info;
567     DBUG_VOID_RETURN;
568   }
569 }
570 
571 /*
572  Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
573 */
ndbcluster_reset_logs(THD * thd)574 static int ndbcluster_reset_logs(THD *thd)
575 {
576   if (!ndb_binlog_running)
577     return 0;
578 
579   /* only reset master should reset logs */
580   if (!((thd->lex->sql_command == SQLCOM_RESET) &&
581         (thd->lex->type & REFRESH_MASTER)))
582     return 0;
583 
584   DBUG_ENTER("ndbcluster_reset_logs");
585 
586   /*
587     Wait for all events originating from this mysql server has
588     reached the binlog before continuing to reset
589   */
590   ndbcluster_binlog_wait(thd);
591 
592   /*
593     Truncate mysql.ndb_binlog_index table, if table does not
594     exist ignore the error as it is a "consistent" behavior
595   */
596   Ndb_local_connection mysqld(thd);
597   const bool ignore_no_such_table = true;
598   if(mysqld.truncate_table(STRING_WITH_LEN("mysql"),
599                            STRING_WITH_LEN("ndb_binlog_index"),
600                            ignore_no_such_table))
601   {
602     // Failed to truncate table
603     DBUG_RETURN(1);
604   }
605   DBUG_RETURN(0);
606 }
607 
608 /*
609   Setup THD object
610   'Inspired' from ha_ndbcluster.cc : ndb_util_thread_func
611 */
612 THD *
ndb_create_thd(char * stackptr)613 ndb_create_thd(char * stackptr)
614 {
615   DBUG_ENTER("ndb_create_thd");
616   THD * thd= new THD; /* note that contructor of THD uses DBUG_ */
617   if (thd == 0)
618   {
619     DBUG_RETURN(0);
620   }
621   THD_CHECK_SENTRY(thd);
622 
623   thd->thread_stack= stackptr; /* remember where our stack is */
624   if (thd->store_globals())
625   {
626     delete thd;
627     DBUG_RETURN(0);
628   }
629 
630   thd->init_for_queries();
631   thd_set_command(thd, COM_DAEMON);
632   thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
633 #ifndef NDB_THD_HAS_NO_VERSION
634   thd->version= refresh_version;
635 #endif
636   thd->get_protocol_classic()->set_client_capabilities(0);
637   thd->lex->start_transaction_opt= 0;
638   thd->security_context()->skip_grants();
639 
640   CHARSET_INFO *charset_connection= get_charset_by_csname("utf8",
641                                                           MY_CS_PRIMARY,
642                                                           MYF(MY_WME));
643   thd->variables.character_set_client= charset_connection;
644   thd->variables.character_set_results= charset_connection;
645   thd->variables.collation_connection= charset_connection;
646   thd->update_charset();
647   DBUG_RETURN(thd);
648 }
649 
650 /*
651   Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
652   is removed
653 */
654 
655 static int
ndbcluster_binlog_index_purge_file(THD * passed_thd,const char * file)656 ndbcluster_binlog_index_purge_file(THD *passed_thd, const char *file)
657 {
658   int stack_base = 0;
659   int error = 0;
660   DBUG_ENTER("ndbcluster_binlog_index_purge_file");
661   DBUG_PRINT("enter", ("file: %s", file));
662 
663   if (!ndb_binlog_running || (passed_thd && passed_thd->slave_thread))
664     DBUG_RETURN(0);
665 
666   /**
667    * This function cannot safely reuse the passed thd object
668    * due to the variety of places from which it is called.
669    *   new/delete one...yuck!
670    */
671   THD* my_thd;
672   if ((my_thd = ndb_create_thd((char*)&stack_base) /* stack ptr */) == 0)
673   {
674     /**
675      * TODO return proper error code here,
676      * BUT! return code is not (currently) checked in
677      *      log.cc : purge_index_entry() so we settle for warning printout
678      * Will sql_print_warning fail with no thd?
679      */
680     sql_print_warning("NDB: Unable to purge "
681                       NDB_REP_DB "." NDB_REP_TABLE
682                       " File=%s (failed to setup thd)", file);
683     DBUG_RETURN(0);
684   }
685 
686 
687   /*
688     delete rows from mysql.ndb_binlog_index table for the given
689     filename, if table does not exist ignore the error as it
690     is a "consistent" behavior
691   */
692   Ndb_local_connection mysqld(my_thd);
693   const bool ignore_no_such_table = true;
694   if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
695                         STRING_WITH_LEN("ndb_binlog_index"),
696                         ignore_no_such_table,
697                         "File='", file, "'", NULL))
698   {
699     // Failed to delete rows from table
700     error = 1;
701   }
702 
703   delete my_thd;
704 
705   if (passed_thd)
706   {
707     /* Relink passed THD with this thread */
708     passed_thd->store_globals();
709   }
710 
711   DBUG_RETURN(error);
712 }
713 
714 
715 // Determine if privilege tables are distributed, ie. stored in NDB
716 bool
priv_tables_are_in_ndb(THD * thd)717 Ndb_dist_priv_util::priv_tables_are_in_ndb(THD* thd)
718 {
719   bool distributed= false;
720   Ndb_dist_priv_util dist_priv;
721   DBUG_ENTER("ndbcluster_distributed_privileges");
722 
723   Ndb *ndb= check_ndb_in_thd(thd);
724   if (!ndb)
725     DBUG_RETURN(false); // MAGNUS, error message?
726 
727   if (ndb->setDatabaseName(dist_priv.database()) != 0)
728     DBUG_RETURN(false);
729 
730   const char* table_name;
731   while((table_name= dist_priv.iter_next_table()))
732   {
733     DBUG_PRINT("info", ("table_name: %s", table_name));
734     Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
735     const NDBTAB *ndbtab= ndbtab_g.get_table();
736     if (ndbtab)
737     {
738       distributed= true;
739     }
740     else if (distributed)
741     {
742       sql_print_error("NDB: Inconsistency detected in distributed "
743                       "privilege tables. Table '%s.%s' is not distributed",
744                       dist_priv.database(), table_name);
745       DBUG_RETURN(false);
746     }
747   }
748   DBUG_RETURN(distributed);
749 }
750 
751 
752 /*
753   ndbcluster_binlog_log_query
754 
755    - callback function installed in handlerton->binlog_log_query
756    - called by MySQL Server in places where no other handlerton
757      function exists which can be used to notify about changes
758    - used by ndbcluster to detect when
759      -- databases are created or altered
760      -- privilege tables have been modified
761 */
762 
763 static void
ndbcluster_binlog_log_query(handlerton * hton,THD * thd,enum_binlog_command binlog_command,const char * query,uint query_length,const char * db,const char * table_name)764 ndbcluster_binlog_log_query(handlerton *hton, THD *thd,
765                             enum_binlog_command binlog_command,
766                             const char *query, uint query_length,
767                             const char *db, const char *table_name)
768 {
769   DBUG_ENTER("ndbcluster_binlog_log_query");
770   DBUG_PRINT("enter", ("db: %s  table_name: %s  query: %s",
771                        db, table_name, query));
772   enum SCHEMA_OP_TYPE type;
773   /* Use random table_id and table_version  */
774   const uint32 table_id = (uint32)rand();
775   const uint32 table_version = (uint32)rand();
776   switch (binlog_command)
777   {
778   case LOGCOM_CREATE_DB:
779     DBUG_PRINT("info", ("New database '%s' created", db));
780     type= SOT_CREATE_DB;
781     break;
782 
783   case LOGCOM_ALTER_DB:
784     DBUG_PRINT("info", ("The database '%s' was altered", db));
785     type= SOT_ALTER_DB;
786     break;
787 
788   case LOGCOM_ACL_NOTIFY:
789     DBUG_PRINT("info", ("Privilege tables have been modified"));
790     type= SOT_GRANT;
791     if (!Ndb_dist_priv_util::priv_tables_are_in_ndb(thd))
792     {
793       DBUG_VOID_RETURN;
794     }
795     /*
796       NOTE! Grant statements with db set to NULL is very rare but
797       may be provoked by for example dropping the currently selected
798       database. Since ndbcluster_log_schema_op does not allow
799       db to be NULL(can't create a key for the ndb_schem_object nor
800       writeNULL to ndb_schema), the situation is salvaged by setting db
801       to the constant string "mysql" which should work in most cases.
802 
803       Interestingly enough this "hack" has the effect that grant statements
804       are written to the remote binlog in same format as if db would have
805       been NULL.
806     */
807     if (!db)
808       db = "mysql";
809     break;
810 
811   default:
812     DBUG_PRINT("info", ("Ignoring binlog_log_query notification"));
813     DBUG_VOID_RETURN;
814     break;
815 
816   }
817   ndbcluster_log_schema_op(thd, query, query_length,
818                            db, table_name, table_id, table_version, type,
819                            NULL, NULL);
820   DBUG_VOID_RETURN;
821 }
822 
823 extern void ndb_util_thread_stop(void);
824 
825 // Instantiate Ndb_binlog_thread component
826 static Ndb_binlog_thread ndb_binlog_thread;
827 
828 
829 /*
830   End use of the NDB Cluster binlog
831    - wait for binlog thread to shutdown
832 */
833 
ndbcluster_binlog_end(THD * thd)834 int ndbcluster_binlog_end(THD *thd)
835 {
836   DBUG_ENTER("ndbcluster_binlog_end");
837 
838   // Stop ndb_util_thread first since it uses THD(which
839   // implicitly depend on binlog)
840   ndb_util_thread_stop();
841 
842   if (ndbcluster_binlog_inited)
843   {
844     ndbcluster_binlog_inited= 0;
845 
846     ndb_binlog_thread.stop();
847     ndb_binlog_thread.deinit();
848 
849     native_mutex_destroy(&injector_mutex);
850     native_cond_destroy(&injector_cond);
851     native_mutex_destroy(&ndb_schema_share_mutex);
852   }
853 
854   DBUG_RETURN(0);
855 }
856 
857 /*****************************************************************
858   functions called from slave sql client threads
859 ****************************************************************/
ndbcluster_reset_slave(THD * thd)860 static void ndbcluster_reset_slave(THD *thd)
861 {
862   int error = 0;
863   if (!ndb_binlog_running)
864     return;
865 
866   DBUG_ENTER("ndbcluster_reset_slave");
867 
868   /*
869     delete all rows from mysql.ndb_apply_status table
870     - if table does not exist ignore the error as it
871       is a consistent behavior
872   */
873   Ndb_local_connection mysqld(thd);
874   const bool ignore_no_such_table = true;
875   if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
876                         STRING_WITH_LEN("ndb_apply_status"),
877                         ignore_no_such_table,
878                         NULL))
879   {
880     // Failed to delete rows from table
881     error = 1;
882   }
883 
884   g_ndb_slave_state.atResetSlave();
885 
886   // pending fix for bug#59844 will make this function return int
887   DBUG_VOID_RETURN;
888 }
889 
890 /*
891   Initialize the binlog part of the ndb handlerton
892 */
893 
ndbcluster_binlog_func(handlerton * hton,THD * thd,enum_binlog_func fn,void * arg)894 static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
895                                   enum_binlog_func fn,
896                                   void *arg)
897 {
898   DBUG_ENTER("ndbcluster_binlog_func");
899   int res= 0;
900   switch(fn)
901   {
902   case BFN_RESET_LOGS:
903     res= ndbcluster_reset_logs(thd);
904     break;
905   case BFN_RESET_SLAVE:
906     ndbcluster_reset_slave(thd);
907     break;
908   case BFN_BINLOG_WAIT:
909     ndbcluster_binlog_wait(thd);
910     break;
911   case BFN_BINLOG_END:
912     res= ndbcluster_binlog_end(thd);
913     break;
914   case BFN_BINLOG_PURGE_FILE:
915     res= ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
916     break;
917   }
918   DBUG_RETURN(res);
919 }
920 
ndbcluster_binlog_init(handlerton * h)921 void ndbcluster_binlog_init(handlerton* h)
922 {
923   h->binlog_func=      ndbcluster_binlog_func;
924   h->binlog_log_query= ndbcluster_binlog_log_query;
925 }
926 
927 
928 /*
929   Convert db and table name into a key to use for searching
930   the ndbcluster_open_tables hash
931 */
932 static size_t
ndb_open_tables__create_key(char * key_buf,size_t key_buf_length,const char * db,size_t db_length,const char * table,size_t table_length)933 ndb_open_tables__create_key(char* key_buf, size_t key_buf_length,
934                             const char* db, size_t db_length,
935                             const char* table, size_t table_length)
936 {
937   size_t key_length =  my_snprintf(key_buf, key_buf_length,
938                                    "./%*s/%*s", db_length, db,
939                                    table_length, table) - 1;
940   assert(key_length > 0);
941   assert(key_length < key_buf_length);
942 
943   return key_length;
944 }
945 
946 
947 /*
948   Check if table with given name is open, ie. is
949   in ndbcluster_open_tables hash
950 */
951 static bool
ndb_open_tables__is_table_open(const char * db,size_t db_length,const char * table,size_t table_length)952 ndb_open_tables__is_table_open(const char* db, size_t db_length,
953                                const char* table, size_t table_length)
954 {
955   char key[FN_REFLEN + 1];
956   size_t key_length = ndb_open_tables__create_key(key, sizeof(key),
957                                                   db, db_length,
958                                                   table, table_length);
959   DBUG_ENTER("ndb_open_tables__is_table_open");
960   DBUG_PRINT("enter", ("db: '%s', table: '%s', key: '%s'",
961                        db, table, key));
962 
963   native_mutex_lock(&ndbcluster_mutex);
964   bool result = my_hash_search(&ndbcluster_open_tables,
965                                (const uchar*)key,
966                                key_length) != NULL;
967   native_mutex_unlock(&ndbcluster_mutex);
968 
969   DBUG_PRINT("exit", ("result: %d", result));
970   DBUG_RETURN(result);
971 }
972 
973 
974 static bool
ndbcluster_check_ndb_schema_share()975 ndbcluster_check_ndb_schema_share()
976 {
977   return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
978                                         STRING_WITH_LEN("ndb_schema"));
979 }
980 
981 
982 static bool
ndbcluster_check_ndb_apply_status_share()983 ndbcluster_check_ndb_apply_status_share()
984 {
985   return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
986                                         STRING_WITH_LEN("ndb_apply_status"));
987 }
988 
989 
990 static bool
create_cluster_sys_table(THD * thd,const char * db,size_t db_length,const char * table,size_t table_length,const char * create_definitions,const char * create_options)991 create_cluster_sys_table(THD *thd, const char* db, size_t db_length,
992                          const char* table, size_t table_length,
993                          const char* create_definitions,
994                          const char* create_options)
995 {
996   if (ndb_open_tables__is_table_open(db, db_length, table, table_length))
997     return false;
998 
999   if (g_ndb_cluster_connection->get_no_ready() <= 0)
1000     return false;
1001 
1002   if (opt_ndb_extra_logging)
1003     sql_print_information("NDB: Creating %s.%s", db, table);
1004 
1005   Ndb_local_connection mysqld(thd);
1006 
1007   /*
1008     Check if table exists in MySQL "dictionary"(i.e on disk)
1009     if so, remove it since there is none in Ndb
1010   */
1011   {
1012     char path[FN_REFLEN + 1];
1013     build_table_filename(path, sizeof(path) - 1,
1014                          db, table, reg_ext, 0);
1015     if (my_delete(path, MYF(0)) == 0)
1016     {
1017       /*
1018         The .frm file existed and was deleted from disk.
1019         It's possible that someone has tried to use it and thus
1020         it might have been inserted in the table definition cache.
1021         It must be flushed to avoid that it exist only in the
1022         table definition cache.
1023       */
1024       if (opt_ndb_extra_logging)
1025         sql_print_information("NDB: Flushing %s.%s", db, table);
1026 
1027       /* Flush mysql.ndb_apply_status table, ignore all errors */
1028       (void)mysqld.flush_table(db, db_length,
1029                                table, table_length);
1030     }
1031   }
1032 
1033   const bool create_if_not_exists = true;
1034   const bool res = mysqld.create_sys_table(db, db_length,
1035                                            table, table_length,
1036                                            create_if_not_exists,
1037                                            create_definitions,
1038                                            create_options);
1039   return res;
1040 }
1041 
1042 
1043 static bool
ndb_apply_table__create(THD * thd)1044 ndb_apply_table__create(THD *thd)
1045 {
1046   DBUG_ENTER("ndb_apply_table__create");
1047 
1048   /* NOTE! Updating this table schema must be reflected in ndb_restore */
1049   const bool res =
1050     create_cluster_sys_table(thd,
1051                              STRING_WITH_LEN("mysql"),
1052                              STRING_WITH_LEN("ndb_apply_status"),
1053                              // table_definition
1054                              "server_id INT UNSIGNED NOT NULL,"
1055                              "epoch BIGINT UNSIGNED NOT NULL, "
1056                              "log_name VARCHAR(255) BINARY NOT NULL, "
1057                              "start_pos BIGINT UNSIGNED NOT NULL, "
1058                              "end_pos BIGINT UNSIGNED NOT NULL, "
1059                              "PRIMARY KEY USING HASH (server_id)",
1060                              // table_options
1061                              "ENGINE=NDB CHARACTER SET latin1");
1062   DBUG_RETURN(res);
1063 }
1064 
1065 
1066 static bool
ndb_schema_table__create(THD * thd)1067 ndb_schema_table__create(THD *thd)
1068 {
1069   DBUG_ENTER("ndb_schema_table__create");
1070 
1071   /* NOTE! Updating this table schema must be reflected in ndb_restore */
1072   const bool res =
1073     create_cluster_sys_table(thd,
1074                              STRING_WITH_LEN("mysql"),
1075                              STRING_WITH_LEN("ndb_schema"),
1076                              // table_definition
1077                              "db VARBINARY("
1078                              NDB_MAX_DDL_NAME_BYTESIZE_STR
1079                              ") NOT NULL,"
1080                              "name VARBINARY("
1081                              NDB_MAX_DDL_NAME_BYTESIZE_STR
1082                              ") NOT NULL,"
1083                              "slock BINARY(32) NOT NULL,"
1084                              "query BLOB NOT NULL,"
1085                              "node_id INT UNSIGNED NOT NULL,"
1086                              "epoch BIGINT UNSIGNED NOT NULL,"
1087                              "id INT UNSIGNED NOT NULL,"
1088                              "version INT UNSIGNED NOT NULL,"
1089                              "type INT UNSIGNED NOT NULL,"
1090                              "PRIMARY KEY USING HASH (db,name)",
1091                              // table_options
1092                              "ENGINE=NDB CHARACTER SET latin1");
1093   DBUG_RETURN(res);
1094 }
1095 
1096 class Thd_ndb_options_guard
1097 {
1098 public:
Thd_ndb_options_guard(Thd_ndb * thd_ndb)1099   Thd_ndb_options_guard(Thd_ndb *thd_ndb)
1100     : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
~Thd_ndb_options_guard()1101   ~Thd_ndb_options_guard() { m_val= m_save_val; }
set(uint32 flag)1102   void set(uint32 flag) { m_val|= flag; }
1103 private:
1104   uint32 &m_val;
1105   uint32 m_save_val;
1106 };
1107 
1108 extern int ndb_setup_complete;
1109 extern native_cond_t COND_ndb_setup_complete;
1110 
1111 /*
1112    ndb_notify_tables_writable
1113 
1114    Called to notify any waiting threads that Ndb tables are
1115    now writable
1116 */
ndb_notify_tables_writable()1117 static void ndb_notify_tables_writable()
1118 {
1119   native_mutex_lock(&ndbcluster_mutex);
1120   ndb_setup_complete= 1;
1121   native_cond_broadcast(&COND_ndb_setup_complete);
1122   native_mutex_unlock(&ndbcluster_mutex);
1123 }
1124 
1125 
1126 /*
1127   Clean-up any stray files for non-existing NDB tables
1128   - "stray" means that there is a .frm + .ndb file on disk
1129     but there exists no such table in NDB. The two files
1130     can then be deleted from disk to get in synch with
1131     what's in NDB.
1132 */
1133 static
clean_away_stray_files(THD * thd)1134 void clean_away_stray_files(THD *thd)
1135 {
1136   DBUG_ENTER("clean_away_stray_files");
1137 
1138   // Populate list of databases
1139   Ndb_find_files_list db_names(thd);
1140   if (!db_names.find_databases(mysql_data_home))
1141   {
1142     thd->clear_error();
1143     DBUG_PRINT("info", ("Failed to find databases"));
1144     DBUG_VOID_RETURN;
1145   }
1146 
1147   LEX_STRING *db_name;
1148   while ((db_name= db_names.next()))
1149   {
1150     DBUG_PRINT("info", ("Found database %s", db_name->str));
1151     if (strcmp(NDB_REP_DB, db_name->str)) /* Skip system database */
1152     {
1153 
1154       sql_print_information("NDB: Cleaning stray tables from database '%s'",
1155                             db_name->str);
1156 
1157       char path[FN_REFLEN + 1];
1158       build_table_filename(path, sizeof(path) - 1, db_name->str, "", "", 0);
1159 
1160       /* Require that no binlog setup is attempted yet, that will come later
1161        * right now we just want to get rid of stray frms et al
1162        */
1163 
1164       Thd_ndb *thd_ndb= get_thd_ndb(thd);
1165       thd_ndb->set_skip_binlog_setup_in_find_files(true);
1166       Ndb_find_files_list tab_names(thd);
1167       if (!tab_names.find_tables(db_name->str, path))
1168       {
1169         thd->clear_error();
1170         DBUG_PRINT("info", ("Failed to find tables"));
1171       }
1172       thd_ndb->set_skip_binlog_setup_in_find_files(false);
1173     }
1174   }
1175   DBUG_VOID_RETURN;
1176 }
1177 
1178 /*
1179   Ndb has no representation of the database schema objects.
1180   The mysql.ndb_schema table contains the latest schema operations
1181   done via a mysqld, and thus reflects databases created/dropped/altered
1182   while a mysqld was disconnected.  This function tries to recover
1183   the correct state w.r.t created databases using the information in
1184   that table.
1185 
1186 
1187 */
ndbcluster_find_all_databases(THD * thd)1188 static int ndbcluster_find_all_databases(THD *thd)
1189 {
1190   Ndb *ndb= check_ndb_in_thd(thd);
1191   Thd_ndb *thd_ndb= get_thd_ndb(thd);
1192   Thd_ndb_options_guard thd_ndb_options(thd_ndb);
1193   NDBDICT *dict= ndb->getDictionary();
1194   NdbTransaction *trans= NULL;
1195   NdbError ndb_error;
1196   int retries= 100;
1197   int retry_sleep= 30; /* 30 milliseconds, transaction */
1198   DBUG_ENTER("ndbcluster_find_all_databases");
1199 
1200   /*
1201     Function should only be called while ndbcluster_global_schema_lock
1202     is held, to ensure that ndb_schema table is not being updated while
1203     scanning.
1204   */
1205   if (!thd_ndb->has_required_global_schema_lock("ndbcluster_find_all_databases"))
1206     DBUG_RETURN(1);
1207 
1208   ndb->setDatabaseName(NDB_REP_DB);
1209   thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
1210   thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
1211   while (1)
1212   {
1213     char db_buffer[FN_REFLEN];
1214     char *db= db_buffer+1;
1215     char name[FN_REFLEN];
1216     char query[64000];
1217     Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1218     const NDBTAB *ndbtab= ndbtab_g.get_table();
1219     NdbScanOperation *op;
1220     NdbBlob *query_blob_handle;
1221     int r= 0;
1222     if (ndbtab == NULL)
1223     {
1224       ndb_error= dict->getNdbError();
1225       goto error;
1226     }
1227     trans= ndb->startTransaction();
1228     if (trans == NULL)
1229     {
1230       ndb_error= ndb->getNdbError();
1231       goto error;
1232     }
1233     op= trans->getNdbScanOperation(ndbtab);
1234     if (op == NULL)
1235     {
1236       ndb_error= trans->getNdbError();
1237       goto error;
1238     }
1239 
1240     op->readTuples(NdbScanOperation::LM_Read,
1241                    NdbScanOperation::SF_TupScan, 1);
1242 
1243     r|= op->getValue("db", db_buffer) == NULL;
1244     r|= op->getValue("name", name) == NULL;
1245     r|= (query_blob_handle= op->getBlobHandle("query")) == NULL;
1246     r|= query_blob_handle->getValue(query, sizeof(query));
1247 
1248     if (r)
1249     {
1250       ndb_error= op->getNdbError();
1251       goto error;
1252     }
1253 
1254     if (trans->execute(NdbTransaction::NoCommit))
1255     {
1256       ndb_error= trans->getNdbError();
1257       goto error;
1258     }
1259 
1260     while ((r= op->nextResult()) == 0)
1261     {
1262       unsigned db_len= db_buffer[0];
1263       unsigned name_len= name[0];
1264       /*
1265         name_len == 0 means no table name, hence the row
1266         is for a database
1267       */
1268       if (db_len > 0 && name_len == 0)
1269       {
1270         /* database found */
1271         db[db_len]= 0;
1272 
1273 	/* find query */
1274         Uint64 query_length= 0;
1275         if (query_blob_handle->getLength(query_length))
1276         {
1277           ndb_error= query_blob_handle->getNdbError();
1278           goto error;
1279         }
1280         query[query_length]= 0;
1281         build_table_filename(name, sizeof(name), db, "", "", 0);
1282         int database_exists= !my_access(name, F_OK);
1283         if (native_strncasecmp("CREATE", query, 6) == 0)
1284         {
1285           /* Database should exist */
1286           if (!database_exists)
1287           {
1288             /* create missing database */
1289             sql_print_information("NDB: Discovered missing database '%s'", db);
1290             const int no_print_error[1]= {0};
1291             run_query(thd, query, query + query_length,
1292                       no_print_error);
1293           }
1294         }
1295         else if (native_strncasecmp("ALTER", query, 5) == 0)
1296         {
1297           /* Database should exist */
1298           if (!database_exists)
1299           {
1300             /* create missing database */
1301             sql_print_information("NDB: Discovered missing database '%s'", db);
1302             const int no_print_error[1]= {0};
1303             name_len= (unsigned)my_snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
1304             run_query(thd, name, name + name_len,
1305                       no_print_error);
1306             run_query(thd, query, query + query_length,
1307                       no_print_error);
1308           }
1309         }
1310         else if (native_strncasecmp("DROP", query, 4) == 0)
1311         {
1312           /* Database should not exist */
1313           if (database_exists)
1314           {
1315             /* drop missing database */
1316             sql_print_information("NDB: Discovered remaining database '%s'", db);
1317           }
1318         }
1319       }
1320     }
1321     if (r == -1)
1322     {
1323       ndb_error= op->getNdbError();
1324       goto error;
1325     }
1326     ndb->closeTransaction(trans);
1327     trans= NULL;
1328     DBUG_RETURN(0); // success
1329   error:
1330     if (trans)
1331     {
1332       ndb->closeTransaction(trans);
1333       trans= NULL;
1334     }
1335     if (ndb_error.status == NdbError::TemporaryError && !thd->killed)
1336     {
1337       if (retries--)
1338       {
1339         sql_print_warning("NDB: ndbcluster_find_all_databases retry: %u - %s",
1340                           ndb_error.code,
1341                           ndb_error.message);
1342         do_retry_sleep(retry_sleep);
1343         continue; // retry
1344       }
1345     }
1346     if (!thd->killed)
1347     {
1348       sql_print_error("NDB: ndbcluster_find_all_databases fail: %u - %s",
1349                       ndb_error.code,
1350                       ndb_error.message);
1351     }
1352 
1353     DBUG_RETURN(1); // not temp error or too many retries
1354   }
1355 }
1356 
1357 
1358 /*
1359   find all tables in ndb and discover those needed
1360 */
1361 static
ndbcluster_find_all_files(THD * thd)1362 int ndbcluster_find_all_files(THD *thd)
1363 {
1364   Ndb* ndb;
1365   char key[FN_REFLEN + 1];
1366   NDBDICT *dict;
1367   int unhandled= 0, retries= 5, skipped= 0;
1368   DBUG_ENTER("ndbcluster_find_all_files");
1369 
1370   if (!(ndb= check_ndb_in_thd(thd)))
1371     DBUG_RETURN(HA_ERR_NO_CONNECTION);
1372 
1373   dict= ndb->getDictionary();
1374 
1375   do
1376   {
1377     NdbDictionary::Dictionary::List list;
1378     if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
1379       DBUG_RETURN(1);
1380     unhandled= 0;
1381     skipped= 0;
1382     retries--;
1383     for (uint i= 0 ; i < list.count ; i++)
1384     {
1385       NDBDICT::List::Element& elmt= list.elements[i];
1386       if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
1387       {
1388         DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
1389         continue;
1390       }
1391       DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
1392       if (elmt.state != NDBOBJ::StateOnline &&
1393           elmt.state != NDBOBJ::StateBackup &&
1394           elmt.state != NDBOBJ::StateBuilding)
1395       {
1396         sql_print_information("NDB: skipping setup table %s.%s, in state %d",
1397                               elmt.database, elmt.name, elmt.state);
1398         skipped++;
1399         continue;
1400       }
1401 
1402       ndb->setDatabaseName(elmt.database);
1403       Ndb_table_guard ndbtab_g(dict, elmt.name);
1404       const NDBTAB *ndbtab= ndbtab_g.get_table();
1405       if (!ndbtab)
1406       {
1407         if (retries == 0)
1408           sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
1409                           elmt.database, elmt.name,
1410                           dict->getNdbError().code,
1411                           dict->getNdbError().message);
1412         unhandled++;
1413         continue;
1414       }
1415 
1416       if (ndbtab->getFrmLength() == 0)
1417         continue;
1418 
1419       /* check if database exists */
1420       char *end= key +
1421         build_table_filename(key, sizeof(key) - 1, elmt.database, "", "", 0);
1422       if (my_access(key, F_OK))
1423       {
1424         /* no such database defined, skip table */
1425         continue;
1426       }
1427       /* finalize construction of path */
1428       end+= tablename_to_filename(elmt.name, end,
1429                                   (uint)(sizeof(key)-(end-key)));
1430       uchar *data= 0, *pack_data= 0;
1431       size_t length, pack_length;
1432       int discover= 0;
1433       if (readfrm(key, &data, &length) ||
1434           packfrm(data, length, &pack_data, &pack_length))
1435       {
1436         discover= 1;
1437         sql_print_information("NDB: missing frm for %s.%s, discovering...",
1438                               elmt.database, elmt.name);
1439       }
1440       else if (cmp_frm(ndbtab, pack_data, pack_length))
1441       {
1442         /* ndb_share reference temporary */
1443         NDB_SHARE *share= get_share(key, 0, FALSE);
1444         if (share)
1445         {
1446           DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
1447                                    share->key_string(), share->use_count));
1448         }
1449         if (!share || get_ndb_share_state(share) != NSS_ALTERED)
1450         {
1451           discover= 1;
1452           sql_print_information("NDB: mismatch in frm for %s.%s,"
1453                                 " discovering...",
1454                                 elmt.database, elmt.name);
1455         }
1456         if (share)
1457         {
1458           /* ndb_share reference temporary free */
1459           DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
1460                                    share->key_string(), share->use_count));
1461           free_share(&share);
1462         }
1463       }
1464       my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
1465       my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
1466 
1467       if (discover)
1468       {
1469         /* ToDo 4.1 database needs to be created if missing */
1470         if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
1471         {
1472           /* ToDo 4.1 handle error */
1473         }
1474       }
1475       else
1476       {
1477         /* set up replication for this table */
1478         ndbcluster_create_binlog_setup(thd, ndb, key, (uint)(end-key),
1479                                        elmt.database, elmt.name,
1480                                        0);
1481       }
1482     }
1483   }
1484   while (unhandled && retries);
1485 
1486   DBUG_RETURN(-(skipped + unhandled));
1487 }
1488 
1489 
1490 bool
ndb_binlog_setup(THD * thd)1491 ndb_binlog_setup(THD *thd)
1492 {
1493   if (ndb_binlog_tables_inited)
1494     return true; // Already setup -> OK
1495 
1496   /*
1497     Can't proceed unless ndb binlog thread has setup
1498     the schema_ndb pointer(since that pointer is used for
1499     creating the event operations owned by ndb_schema_share)
1500   */
1501   native_mutex_lock(&injector_mutex);
1502   if (!schema_ndb)
1503   {
1504     native_mutex_unlock(&injector_mutex);
1505     return false;
1506   }
1507   native_mutex_unlock(&injector_mutex);
1508 
1509   /*
1510     Take the global schema lock to make sure that
1511     the schema is not changed in the cluster while
1512     running setup.
1513   */
1514   Ndb_global_schema_lock_guard global_schema_lock_guard(thd);
1515   if (global_schema_lock_guard.lock(false, false))
1516     return false;
1517 
1518   if (!ndb_schema_share &&
1519       ndbcluster_check_ndb_schema_share() == 0)
1520   {
1521     ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
1522     if (!ndb_schema_share)
1523     {
1524       ndb_schema_table__create(thd);
1525       // always make sure we create the 'schema' first
1526       if (!ndb_schema_share)
1527         return false;
1528     }
1529   }
1530   if (!ndb_apply_status_share &&
1531       ndbcluster_check_ndb_apply_status_share() == 0)
1532   {
1533     ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
1534     if (!ndb_apply_status_share)
1535     {
1536       ndb_apply_table__create(thd);
1537       if (!ndb_apply_status_share)
1538         return false;
1539     }
1540   }
1541 
1542   clean_away_stray_files(thd);
1543 
1544   if (ndbcluster_find_all_databases(thd))
1545   {
1546     return false;
1547   }
1548 
1549   if (ndbcluster_find_all_files(thd))
1550   {
1551     return false;
1552   }
1553 
1554   ndb_binlog_tables_inited= TRUE;
1555 
1556   if (ndb_binlog_running && ndb_binlog_is_ready)
1557   {
1558     if (opt_ndb_extra_logging)
1559       sql_print_information("NDB Binlog: ndb tables writable");
1560 
1561     ndb_tdc_close_cached_tables();
1562 
1563     /*
1564        Signal any waiting thread that ndb table setup is
1565        now complete
1566     */
1567     ndb_notify_tables_writable();
1568   }
1569 
1570   /* Signal injector thread that all is setup */
1571   native_cond_signal(&injector_cond);
1572 
1573   return true; // Setup completed -> OK
1574 }
1575 
1576 /*
1577   Defines and struct for schema table.
1578   Should reflect table definition above.
1579 */
1580 #define SCHEMA_DB_I 0u
1581 #define SCHEMA_NAME_I 1u
1582 #define SCHEMA_SLOCK_I 2u
1583 #define SCHEMA_QUERY_I 3u
1584 #define SCHEMA_NODE_ID_I 4u
1585 #define SCHEMA_EPOCH_I 5u
1586 #define SCHEMA_ID_I 6u
1587 #define SCHEMA_VERSION_I 7u
1588 #define SCHEMA_TYPE_I 8u
1589 #define SCHEMA_SIZE 9u
1590 #define SCHEMA_SLOCK_SIZE 32u
1591 
1592 
1593 /*
1594   log query in schema table
1595 */
ndb_report_waiting(const char * key,int the_time,const char * op,const char * obj,const MY_BITMAP * map)1596 static void ndb_report_waiting(const char *key,
1597                                int the_time,
1598                                const char *op,
1599                                const char *obj,
1600                                const MY_BITMAP * map)
1601 {
1602   ulonglong ndb_latest_epoch= 0;
1603   const char *proc_info= "<no info>";
1604   native_mutex_lock(&injector_mutex);
1605   if (injector_ndb)
1606     ndb_latest_epoch= injector_ndb->getLatestGCI();
1607   if (injector_thd)
1608     proc_info= injector_thd->proc_info;
1609   native_mutex_unlock(&injector_mutex);
1610   if (map == 0)
1611   {
1612     sql_print_information("NDB %s:"
1613                           " waiting max %u sec for %s %s."
1614                           "  epochs: (%u/%u,%u/%u,%u/%u)"
1615                           "  injector proc_info: %s"
1616                           ,key, the_time, op, obj
1617                           ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1618                           ,(uint)(ndb_latest_handled_binlog_epoch)
1619                           ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1620                           ,(uint)(ndb_latest_received_binlog_epoch)
1621                           ,(uint)(ndb_latest_epoch >> 32)
1622                           ,(uint)(ndb_latest_epoch)
1623                           ,proc_info
1624                           );
1625   }
1626   else
1627   {
1628     sql_print_information("NDB %s:"
1629                           " waiting max %u sec for %s %s."
1630                           "  epochs: (%u/%u,%u/%u,%u/%u)"
1631                           "  injector proc_info: %s map: %x%x"
1632                           ,key, the_time, op, obj
1633                           ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1634                           ,(uint)(ndb_latest_handled_binlog_epoch)
1635                           ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1636                           ,(uint)(ndb_latest_received_binlog_epoch)
1637                           ,(uint)(ndb_latest_epoch >> 32)
1638                           ,(uint)(ndb_latest_epoch)
1639                           ,proc_info
1640                           ,map->bitmap[0]
1641                           ,map->bitmap[1]
1642                           );
1643   }
1644 }
1645 
1646 
1647 extern void update_slave_api_stats(Ndb*);
1648 
ndbcluster_log_schema_op(THD * thd,const char * query,int query_length,const char * db,const char * table_name,uint32 ndb_table_id,uint32 ndb_table_version,enum SCHEMA_OP_TYPE type,const char * new_db,const char * new_table_name,bool log_query_on_participant)1649 int ndbcluster_log_schema_op(THD *thd,
1650                              const char *query, int query_length,
1651                              const char *db, const char *table_name,
1652                              uint32 ndb_table_id,
1653                              uint32 ndb_table_version,
1654                              enum SCHEMA_OP_TYPE type,
1655                              const char *new_db, const char *new_table_name,
1656                              bool log_query_on_participant)
1657 {
1658   DBUG_ENTER("ndbcluster_log_schema_op");
1659   Thd_ndb *thd_ndb= get_thd_ndb(thd);
1660   if (!thd_ndb)
1661   {
1662     if (!(thd_ndb= Thd_ndb::seize(thd)))
1663     {
1664       sql_print_error("Could not allocate Thd_ndb object");
1665       DBUG_RETURN(1);
1666     }
1667     thd_set_thd_ndb(thd, thd_ndb);
1668   }
1669 
1670   DBUG_PRINT("enter",
1671              ("query: %s  db: %s  table_name: %s  thd_ndb->options: %d",
1672               query, db, table_name, thd_ndb->options));
1673   if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1674   {
1675     if (thd->slave_thread)
1676       update_slave_api_stats(thd_ndb->ndb);
1677 
1678     DBUG_RETURN(0);
1679   }
1680 
1681   /* Check that the database name will fit within limits */
1682   if(strlen(db) > NDB_MAX_DDL_NAME_BYTESIZE)
1683   {
1684     // Catch unexpected commands with too long db length
1685     assert(type == SOT_CREATE_DB ||
1686            type == SOT_ALTER_DB ||
1687            type == SOT_DROP_DB);
1688     push_warning_printf(thd, Sql_condition::SL_WARNING,
1689                         ER_TOO_LONG_IDENT,
1690                         "Ndb has an internal limit of %u bytes on the size of schema identifiers",
1691                         NDB_MAX_DDL_NAME_BYTESIZE);
1692     DBUG_RETURN(ER_TOO_LONG_IDENT);
1693   }
1694 
1695   char tmp_buf2[FN_REFLEN];
1696   char quoted_table1[2 + 2 * FN_REFLEN + 1];
1697   char quoted_db1[2 + 2 * FN_REFLEN + 1];
1698   char quoted_db2[2 + 2 * FN_REFLEN + 1];
1699   char quoted_table2[2 + 2 * FN_REFLEN + 1];
1700   size_t id_length= 0;
1701   const char *type_str;
1702   uint32 log_type= (uint32)type;
1703   switch (type)
1704   {
1705   case SOT_DROP_TABLE:
1706     /* drop database command, do not log at drop table */
1707     if (thd->lex->sql_command ==  SQLCOM_DROP_DB)
1708       DBUG_RETURN(0);
1709     /*
1710       Rewrite the drop table query as it may contain several tables
1711       but drop_table() is called once for each table in the query
1712       ie. DROP TABLE t1, t2;
1713           -> DROP TABLE t1 + DROP TABLE t2
1714     */
1715 
1716     query= tmp_buf2;
1717     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1718                                             table_name, 0);
1719     quoted_table1[id_length]= '\0';
1720     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1721                                             db, 0);
1722     quoted_db1[id_length]= '\0';
1723     query_length= (uint) (strxmov(tmp_buf2, "drop table ", quoted_db1, ".",
1724                                   quoted_table1, NullS) - tmp_buf2);
1725     type_str= "drop table";
1726     break;
1727   case SOT_RENAME_TABLE_PREPARE:
1728     type_str= "rename table prepare";
1729     break;
1730   case SOT_RENAME_TABLE:
1731     /*
1732       Rewrite the rename table query as it may contain several tables
1733       but rename_table() is called once for each table in the query
1734       ie. RENAME TABLE t1 to tx, t2 to ty;
1735           -> RENAME TABLE t1 to tx + RENAME TABLE t2 to ty
1736     */
1737     query= tmp_buf2;
1738     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1739                                             db, 0);
1740     quoted_db1[id_length]= '\0';
1741     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1742                                             table_name, 0);
1743     quoted_table1[id_length]= '\0';
1744     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
1745                                             new_db, 0);
1746     quoted_db2[id_length]= '\0';
1747     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
1748                                             new_table_name, 0);
1749     quoted_table2[id_length]= '\0';
1750     query_length= (uint) (strxmov(tmp_buf2, "rename table ",
1751                                   quoted_db1, ".", quoted_table1, " to ",
1752                                   quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
1753     type_str= "rename table";
1754     break;
1755   case SOT_CREATE_TABLE:
1756     type_str= "create table";
1757     break;
1758   case SOT_ALTER_TABLE_COMMIT:
1759     type_str= "alter table";
1760     break;
1761   case SOT_ONLINE_ALTER_TABLE_PREPARE:
1762     type_str= "online alter table prepare";
1763     break;
1764   case SOT_ONLINE_ALTER_TABLE_COMMIT:
1765     type_str= "online alter table commit";
1766     break;
1767   case SOT_DROP_DB:
1768     type_str= "drop db";
1769     break;
1770   case SOT_CREATE_DB:
1771     type_str= "create db";
1772     break;
1773   case SOT_ALTER_DB:
1774     type_str= "alter db";
1775     break;
1776   case SOT_TABLESPACE:
1777     type_str= "tablespace";
1778     break;
1779   case SOT_LOGFILE_GROUP:
1780     type_str= "logfile group";
1781     break;
1782   case SOT_TRUNCATE_TABLE:
1783     type_str= "truncate table";
1784     break;
1785   case SOT_CREATE_USER:
1786     type_str= "create user";
1787     break;
1788   case SOT_DROP_USER:
1789     type_str= "drop user";
1790     break;
1791   case SOT_RENAME_USER:
1792     type_str= "rename user";
1793     break;
1794   case SOT_GRANT:
1795     type_str= "grant/revoke";
1796     break;
1797   case SOT_REVOKE:
1798     type_str= "revoke all";
1799     break;
1800   default:
1801     abort(); /* should not happen, programming error */
1802   }
1803 
1804   NDB_SCHEMA_OBJECT *ndb_schema_object;
1805   {
1806     char key[FN_REFLEN + 1];
1807     build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1808     ndb_schema_object= ndb_get_schema_object(key, true);
1809     ndb_schema_object->table_id= ndb_table_id;
1810     ndb_schema_object->table_version= ndb_table_version;
1811   }
1812 
1813   const NdbError *ndb_error= 0;
1814   // Use nodeid of the primary cluster connection since that is
1815   // the nodeid which the coordinator and participants listen to
1816   const uint32 node_id= g_ndb_cluster_connection->node_id();
1817   Uint64 epoch= 0;
1818   {
1819     /* begin protect ndb_schema_share */
1820     native_mutex_lock(&ndb_schema_share_mutex);
1821     if (ndb_schema_share == 0)
1822     {
1823       native_mutex_unlock(&ndb_schema_share_mutex);
1824       ndb_free_schema_object(&ndb_schema_object);
1825       DBUG_RETURN(0);
1826     }
1827     native_mutex_unlock(&ndb_schema_share_mutex);
1828   }
1829 
1830   Ndb *ndb= thd_ndb->ndb;
1831   char save_db[FN_REFLEN];
1832   strcpy(save_db, ndb->getDatabaseName());
1833 
1834   char tmp_buf[FN_REFLEN];
1835   NDBDICT *dict= ndb->getDictionary();
1836   ndb->setDatabaseName(NDB_REP_DB);
1837   Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1838   const NDBTAB *ndbtab= ndbtab_g.get_table();
1839   NdbTransaction *trans= 0;
1840   int retries= 100;
1841   int retry_sleep= 30; /* 30 milliseconds, transaction */
1842   const NDBCOL *col[SCHEMA_SIZE];
1843   unsigned sz[SCHEMA_SIZE];
1844 
1845   if (ndbtab == 0)
1846   {
1847     if (strcmp(NDB_REP_DB, db) != 0 ||
1848         strcmp(NDB_SCHEMA_TABLE, table_name))
1849     {
1850       ndb_error= &dict->getNdbError();
1851     }
1852     goto end;
1853   }
1854 
1855   {
1856     uint i;
1857     for (i= 0; i < SCHEMA_SIZE; i++)
1858     {
1859       col[i]= ndbtab->getColumn(i);
1860       if (i != SCHEMA_QUERY_I)
1861       {
1862         sz[i]= col[i]->getLength();
1863         assert(sz[i] <= sizeof(tmp_buf));
1864       }
1865     }
1866   }
1867 
1868   while (1)
1869   {
1870     const char *log_db= db;
1871     const char *log_tab= table_name;
1872     const char *log_subscribers= (char*)ndb_schema_object->slock;
1873     if ((trans= ndb->startTransaction()) == 0)
1874       goto err;
1875     while (1)
1876     {
1877       NdbOperation *op= 0;
1878       int r= 0;
1879       r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1880       assert(r == 0);
1881       r|= op->writeTuple();
1882       assert(r == 0);
1883 
1884       /* db */
1885       ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, (int)strlen(log_db));
1886       r|= op->equal(SCHEMA_DB_I, tmp_buf);
1887       assert(r == 0);
1888       /* name */
1889       ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
1890                        (int)strlen(log_tab));
1891       r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1892       assert(r == 0);
1893       /* slock */
1894       assert(sz[SCHEMA_SLOCK_I] ==
1895              no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1896       r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
1897       assert(r == 0);
1898       /* query */
1899       {
1900         NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
1901         assert(ndb_blob != 0);
1902         uint blob_len= query_length;
1903         const char* blob_ptr= query;
1904         r|= ndb_blob->setValue(blob_ptr, blob_len);
1905         assert(r == 0);
1906       }
1907       /* node_id */
1908       r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1909       assert(r == 0);
1910       /* epoch */
1911       r|= op->setValue(SCHEMA_EPOCH_I, epoch);
1912       assert(r == 0);
1913       /* id */
1914       r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
1915       assert(r == 0);
1916       /* version */
1917       r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
1918       assert(r == 0);
1919       /* type */
1920       r|= op->setValue(SCHEMA_TYPE_I, log_type);
1921       assert(r == 0);
1922       /* any value */
1923       Uint32 anyValue = 0;
1924       if (! thd->slave_thread)
1925       {
1926         /* Schema change originating from this MySQLD, check SQL_LOG_BIN
1927          * variable and pass 'setting' to all logging MySQLDs via AnyValue
1928          */
1929         if (thd_options(thd) & OPTION_BIN_LOG) /* e.g. SQL_LOG_BIN == on */
1930         {
1931           DBUG_PRINT("info", ("Schema event for binlogging"));
1932           ndbcluster_anyvalue_set_normal(anyValue);
1933         }
1934         else
1935         {
1936           DBUG_PRINT("info", ("Schema event not for binlogging"));
1937           ndbcluster_anyvalue_set_nologging(anyValue);
1938         }
1939 
1940         if(!log_query_on_participant)
1941         {
1942           DBUG_PRINT("info", ("Forcing query not to be binlogged on participant"));
1943           ndbcluster_anyvalue_set_nologging(anyValue);
1944         }
1945       }
1946       else
1947       {
1948         /*
1949            Slave propagating replicated schema event in ndb_schema
1950            In case replicated serverId is composite
1951            (server-id-bits < 31) we copy it into the
1952            AnyValue as-is
1953            This is for 'future', as currently Schema operations
1954            do not have composite AnyValues.
1955            In future it may be useful to support *not* mapping composite
1956            AnyValues to/from Binlogged server-ids.
1957         */
1958         DBUG_PRINT("info", ("Replicated schema event with original server id %d",
1959                             thd->server_id));
1960         anyValue = thd_unmasked_server_id(thd);
1961       }
1962 
1963 #ifndef NDEBUG
1964       /*
1965         MySQLD will set the user-portion of AnyValue (if any) to all 1s
1966         This tests code filtering ServerIds on the value of server-id-bits.
1967       */
1968       const char* p = getenv("NDB_TEST_ANYVALUE_USERDATA");
1969       if (p != 0  && *p != 0 && *p != '0' && *p != 'n' && *p != 'N')
1970       {
1971         dbug_ndbcluster_anyvalue_set_userbits(anyValue);
1972       }
1973 #endif
1974       r|= op->setAnyValue(anyValue);
1975       assert(r == 0);
1976       break;
1977     }
1978     if (trans->execute(NdbTransaction::Commit, NdbOperation::DefaultAbortOption,
1979                        1 /* force send */) == 0)
1980     {
1981       DBUG_PRINT("info", ("logged: %s", query));
1982       dict->forceGCPWait(1);
1983       break;
1984     }
1985 err:
1986     const NdbError *this_error= trans ?
1987       &trans->getNdbError() : &ndb->getNdbError();
1988     if (this_error->status == NdbError::TemporaryError && !thd->killed)
1989     {
1990       if (retries--)
1991       {
1992         if (trans)
1993           ndb->closeTransaction(trans);
1994         do_retry_sleep(retry_sleep);
1995         continue; // retry
1996       }
1997     }
1998     ndb_error= this_error;
1999     break;
2000   }
2001 end:
2002   if (ndb_error)
2003     push_warning_printf(thd, Sql_condition::SL_WARNING,
2004                         ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2005                         ndb_error->code,
2006                         ndb_error->message,
2007                         "Could not log query '%s' on other mysqld's");
2008 
2009   if (trans)
2010     ndb->closeTransaction(trans);
2011   ndb->setDatabaseName(save_db);
2012 
2013   if (opt_ndb_extra_logging > 19)
2014   {
2015     sql_print_information("NDB: distributed %s.%s(%u/%u) type: %s(%u) query: \'%s\' to %x%x",
2016                           db,
2017                           table_name,
2018                           ndb_table_id,
2019                           ndb_table_version,
2020                           get_schema_type_name(log_type),
2021                           log_type,
2022                           query,
2023                           ndb_schema_object->slock_bitmap.bitmap[0],
2024                           ndb_schema_object->slock_bitmap.bitmap[1]);
2025   }
2026 
2027   /*
2028     Wait for other mysqld's to acknowledge the table operation
2029   */
2030   if (ndb_error == 0 && !bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2031   {
2032     int max_timeout= DEFAULT_SYNC_TIMEOUT;
2033     native_mutex_lock(&ndb_schema_object->mutex);
2034     while (1)
2035     {
2036       struct timespec abstime;
2037       set_timespec(&abstime, 1);
2038 
2039       // Unlock the schema object and wait for injector to signal that
2040       // something has happened. (NOTE! convoluted in order to
2041       // only use injector_cond with injector_mutex)
2042       native_mutex_unlock(&ndb_schema_object->mutex);
2043       native_mutex_lock(&injector_mutex);
2044       int ret= native_cond_timedwait(&injector_cond,
2045                                       &injector_mutex,
2046                                       &abstime);
2047       native_mutex_unlock(&injector_mutex);
2048       native_mutex_lock(&ndb_schema_object->mutex);
2049 
2050       if (thd->killed)
2051         break;
2052 
2053       /* begin protect ndb_schema_share */
2054       native_mutex_lock(&ndb_schema_share_mutex);
2055       if (ndb_schema_share == 0)
2056       {
2057         native_mutex_unlock(&ndb_schema_share_mutex);
2058         break;
2059       }
2060       native_mutex_unlock(&ndb_schema_share_mutex);
2061       /* end protect ndb_schema_share */
2062 
2063       if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2064         break;
2065 
2066       if (ret)
2067       {
2068         max_timeout--;
2069         if (max_timeout == 0)
2070         {
2071           sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
2072                           type_str, ndb_schema_object->key);
2073           assert(false);
2074           break;
2075         }
2076         if (opt_ndb_extra_logging)
2077           ndb_report_waiting(type_str, max_timeout,
2078                              "distributing", ndb_schema_object->key,
2079                              &ndb_schema_object->slock_bitmap);
2080       }
2081     }
2082     native_mutex_unlock(&ndb_schema_object->mutex);
2083   }
2084   else if (ndb_error)
2085   {
2086     sql_print_error("NDB %s: distributing %s err: %u",
2087                     type_str, ndb_schema_object->key,
2088                     ndb_error->code);
2089   }
2090   else if (opt_ndb_extra_logging > 19)
2091   {
2092     sql_print_information("NDB %s: not waiting for distributing %s",
2093                           type_str, ndb_schema_object->key);
2094   }
2095 
2096   ndb_free_schema_object(&ndb_schema_object);
2097 
2098   if (opt_ndb_extra_logging > 19)
2099   {
2100     sql_print_information("NDB: distribution of %s.%s(%u/%u) type: %s(%u) query: \'%s\'"
2101                           " - complete!",
2102                           db,
2103                           table_name,
2104                           ndb_table_id,
2105                           ndb_table_version,
2106                           get_schema_type_name(log_type),
2107                           log_type,
2108                           query);
2109   }
2110 
2111   if (thd->slave_thread)
2112     update_slave_api_stats(ndb);
2113 
2114   DBUG_RETURN(0);
2115 }
2116 
2117 
2118 static
2119 int
ndb_handle_schema_change(THD * thd,Ndb * is_ndb,NdbEventOperation * pOp,const Ndb_event_data * event_data)2120 ndb_handle_schema_change(THD *thd, Ndb *is_ndb, NdbEventOperation *pOp,
2121                          const Ndb_event_data *event_data)
2122 {
2123   DBUG_ENTER("ndb_handle_schema_change");
2124   DBUG_PRINT("enter", ("pOp: %p", pOp));
2125 
2126   // Only called for TE_DROP and TE_CLUSTER_FAILURE event
2127   assert(pOp->getEventType() == NDBEVENT::TE_DROP ||
2128          pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE);
2129 
2130   assert(event_data);
2131 
2132   NDB_SHARE *share= event_data->share;
2133   dbug_print_share("changed share: ", share);
2134 
2135   TABLE *shadow_table= event_data->shadow_table;
2136   const char *tabname= shadow_table->s->table_name.str;
2137   const char *dbname= shadow_table->s->db.str;
2138   {
2139     Thd_ndb *thd_ndb= get_thd_ndb(thd);
2140     Ndb *ndb= thd_ndb->ndb;
2141     NDBDICT *dict= ndb->getDictionary();
2142     ndb->setDatabaseName(dbname);
2143     Ndb_table_guard ndbtab_g(dict, tabname);
2144     const NDBTAB *ev_tab= pOp->getTable();
2145     const NDBTAB *cache_tab= ndbtab_g.get_table();
2146     if (cache_tab &&
2147         cache_tab->getObjectId() == ev_tab->getObjectId() &&
2148         cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
2149       ndbtab_g.invalidate();
2150   }
2151 
2152   native_mutex_lock(&share->mutex);
2153   assert(share->state == NSS_DROPPED ||
2154          share->op == pOp || share->new_op == pOp);
2155   if (share->new_op)
2156   {
2157     share->new_op= 0;
2158   }
2159   if (share->op)
2160   {
2161     share->op= 0;
2162   }
2163   native_mutex_unlock(&share->mutex);
2164 
2165   /* Signal ha_ndbcluster::delete/rename_table that drop is done */
2166   DBUG_PRINT("info", ("signal that drop is done"));
2167   (void) native_cond_signal(&injector_cond);
2168 
2169   native_mutex_lock(&ndbcluster_mutex);
2170   /* ndb_share reference binlog free */
2171   DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
2172                            share->key_string(), share->use_count));
2173   free_share(&share, TRUE);
2174 
2175   bool do_close_cached_tables= FALSE;
2176   bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
2177   if (is_remote_change && share && share->state != NSS_DROPPED)
2178   {
2179     DBUG_PRINT("info", ("remote change"));
2180     ndbcluster_mark_share_dropped(share);
2181     if (share->use_count != 1)
2182     {
2183       /* open handler holding reference */
2184       /* wait with freeing create ndb_share to below */
2185       do_close_cached_tables= TRUE;
2186     }
2187     else
2188     {
2189       /* ndb_share reference create free */
2190       DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
2191                                share->key_string(), share->use_count));
2192       free_share(&share, TRUE);
2193       share= 0;
2194     }
2195   }
2196   else
2197     share= 0;
2198   native_mutex_unlock(&ndbcluster_mutex);
2199 
2200   DBUG_PRINT("info", ("Deleting event_data"));
2201   delete event_data;
2202   pOp->setCustomData(NULL);
2203 
2204   DBUG_PRINT("info", ("Dropping event operation: %p", pOp));
2205   native_mutex_lock(&injector_mutex);
2206   is_ndb->dropEventOperation(pOp);
2207   native_mutex_unlock(&injector_mutex);
2208 
2209   if (do_close_cached_tables)
2210   {
2211     ndb_tdc_close_cached_table(thd, dbname, tabname);
2212     /* ndb_share reference create free */
2213     DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
2214                              share->key_string(), share->use_count));
2215     free_share(&share);
2216   }
2217   DBUG_RETURN(0);
2218 }
2219 
2220 
2221 class Mutex_guard
2222 {
2223 public:
Mutex_guard(native_mutex_t & mutex)2224   Mutex_guard(native_mutex_t &mutex) : m_mutex(mutex)
2225   {
2226     native_mutex_lock(&m_mutex);
2227   };
~Mutex_guard()2228   ~Mutex_guard()
2229   {
2230     native_mutex_unlock(&m_mutex);
2231   };
2232 private:
2233   native_mutex_t &m_mutex;
2234 };
2235 
2236 
2237 /*
2238   Data used by the Ndb_schema_event_handler which lives
2239   as long as the NDB Binlog thread is connected to the cluster.
2240 
2241   NOTE! An Ndb_schema_event_handler instance only lives for one epoch
2242 
2243  */
2244 class Ndb_schema_dist_data {
2245   static const uint max_ndb_nodes= 256; /* multiple of 32 */
2246   uchar m_data_node_id_list[max_ndb_nodes];
2247   /*
2248     The subscribers to ndb_schema are tracked separately for each
2249     data node. This avoids the need to know which data nodes are
2250     connected.
2251     An api counts as subscribed as soon as one of the data nodes
2252     report it as subscibed.
2253   */
2254   MY_BITMAP *subscriber_bitmap;
2255   unsigned m_num_bitmaps;
2256 
2257   // Holds the new key for a table to be renamed
2258   struct NDB_SHARE_KEY* m_prepared_rename_key;
2259 public:
2260   Ndb_schema_dist_data(const Ndb_schema_dist_data&); // Not implemented
Ndb_schema_dist_data()2261   Ndb_schema_dist_data() :
2262     subscriber_bitmap(NULL),
2263     m_num_bitmaps(0),
2264     m_prepared_rename_key(NULL)
2265   {}
2266 
init(Ndb_cluster_connection * cluster_connection)2267   void init(Ndb_cluster_connection* cluster_connection)
2268   {
2269     // Initialize "g_node_id_map" which maps from nodeid to index in
2270     // subscriber bitmaps array. The mapping array is only used when
2271     // the NDB binlog thread handles events on the mysql.ndb_schema table
2272     uint node_id, i= 0;
2273     Ndb_cluster_connection_node_iter node_iter;
2274     memset((void *)m_data_node_id_list, 0xFFFF, sizeof(m_data_node_id_list));
2275     while ((node_id= cluster_connection->get_next_node(node_iter)))
2276       m_data_node_id_list[node_id]= i++;
2277 
2278     {
2279       // Create array of bitmaps for keeping track of subscribed nodes
2280       unsigned no_nodes= cluster_connection->no_db_nodes();
2281       subscriber_bitmap= (MY_BITMAP*)my_malloc(PSI_INSTRUMENT_ME,
2282                                                no_nodes * sizeof(MY_BITMAP),
2283                                                MYF(MY_WME));
2284       for (unsigned i= 0; i < no_nodes; i++)
2285       {
2286         bitmap_init(&subscriber_bitmap[i],
2287                     (Uint32*)my_malloc(PSI_INSTRUMENT_ME,
2288                                        max_ndb_nodes/8, MYF(MY_WME)),
2289                     max_ndb_nodes, FALSE);
2290         bitmap_clear_all(&subscriber_bitmap[i]);
2291       }
2292       // Remember the number of bitmaps allocated
2293       m_num_bitmaps = no_nodes;
2294     }
2295   }
2296 
release(void)2297   void release(void)
2298   {
2299     if (!m_num_bitmaps)
2300     {
2301       // Allow release without init(), happens when binlog thread
2302       // is terminated before connection to cluster has been made
2303       // NOTE! Should be possible to use static memory for the arrays
2304       return;
2305     }
2306 
2307     for (unsigned i= 0; i < m_num_bitmaps; i++)
2308     {
2309       // Free memory allocated for the bitmap
2310       // allocated by my_malloc() and passed as "buf" to bitmap_init()
2311       bitmap_free(&subscriber_bitmap[i]);
2312     }
2313     // Free memory allocated for the bitmap array
2314     my_free(subscriber_bitmap);
2315     m_num_bitmaps = 0;
2316 
2317     // Release the prepared rename key, it's very unlikely
2318     // that the key is still around here, but just in case
2319     NDB_SHARE::free_key(m_prepared_rename_key);
2320   }
2321 
2322   // Map from nodeid to position in subscriber bitmaps array
map2subscriber_bitmap_index(uint data_node_id) const2323   uint8 map2subscriber_bitmap_index(uint data_node_id) const
2324   {
2325     assert(data_node_id <
2326            (sizeof(m_data_node_id_list)/sizeof(m_data_node_id_list[0])));
2327     const uint8 bitmap_index = m_data_node_id_list[data_node_id];
2328     assert(bitmap_index != 0xFF);
2329     assert(bitmap_index < m_num_bitmaps);
2330     return bitmap_index;
2331   }
2332 
report_data_node_failure(unsigned data_node_id)2333   void report_data_node_failure(unsigned data_node_id)
2334   {
2335     uint8 idx= map2subscriber_bitmap_index(data_node_id);
2336     bitmap_clear_all(&subscriber_bitmap[idx]);
2337     DBUG_PRINT("info",("Data node %u failure", data_node_id));
2338     if (opt_ndb_extra_logging)
2339     {
2340       sql_print_information("NDB Schema dist: Data node: %d failed,"
2341                             " subscriber bitmask %x%x",
2342                             data_node_id,
2343                             subscriber_bitmap[idx].bitmap[1],
2344                             subscriber_bitmap[idx].bitmap[0]);
2345     }
2346   }
2347 
report_subscribe(unsigned data_node_id,unsigned subscriber_node_id)2348   void report_subscribe(unsigned data_node_id, unsigned subscriber_node_id)
2349   {
2350     uint8 idx= map2subscriber_bitmap_index(data_node_id);
2351     assert(subscriber_node_id != 0);
2352     bitmap_set_bit(&subscriber_bitmap[idx], subscriber_node_id);
2353     DBUG_PRINT("info",("Data node %u reported node %u subscribed ",
2354                        data_node_id, subscriber_node_id));
2355     if (opt_ndb_extra_logging)
2356     {
2357       sql_print_information("NDB Schema dist: Data node: %d reports "
2358                             "subscribe from node %d, subscriber bitmask %x%x",
2359                             data_node_id,
2360                             subscriber_node_id,
2361                             subscriber_bitmap[idx].bitmap[1],
2362                             subscriber_bitmap[idx].bitmap[0]);
2363     }
2364   }
2365 
report_unsubscribe(unsigned data_node_id,unsigned subscriber_node_id)2366   void report_unsubscribe(unsigned data_node_id, unsigned subscriber_node_id)
2367   {
2368     uint8 idx= map2subscriber_bitmap_index(data_node_id);
2369     assert(subscriber_node_id != 0);
2370     bitmap_clear_bit(&subscriber_bitmap[idx], subscriber_node_id);
2371     DBUG_PRINT("info",("Data node %u reported node %u unsubscribed ",
2372                        data_node_id, subscriber_node_id));
2373     if (opt_ndb_extra_logging)
2374     {
2375       sql_print_information("NDB Schema dist: Data node: %d reports "
2376                             "subscribe from node %d, subscriber bitmask %x%x",
2377                             data_node_id,
2378                             subscriber_node_id,
2379                             subscriber_bitmap[idx].bitmap[1],
2380                             subscriber_bitmap[idx].bitmap[0]);
2381     }
2382   }
2383 
get_subscriber_bitmask(MY_BITMAP * servers)2384   void get_subscriber_bitmask(MY_BITMAP* servers)
2385   {
2386     for (unsigned i= 0; i < m_num_bitmaps; i++)
2387     {
2388       bitmap_union(servers, &subscriber_bitmap[i]);
2389     }
2390   }
2391 
2392 
save_prepared_rename_key(NDB_SHARE_KEY * key)2393   void save_prepared_rename_key(NDB_SHARE_KEY* key)
2394   {
2395     m_prepared_rename_key = key;
2396   }
2397 
get_prepared_rename_key() const2398   NDB_SHARE_KEY* get_prepared_rename_key() const {
2399     return m_prepared_rename_key;
2400   }
2401 
2402 };
2403 
2404 #include "ndb_local_schema.h"
2405 
2406 class Ndb_schema_event_handler {
2407 
2408   class Ndb_schema_op
2409   {
2410     // Unpack Ndb_schema_op from event_data pointer
unpack_event(const Ndb_event_data * event_data)2411     void unpack_event(const Ndb_event_data *event_data)
2412     {
2413       TABLE *table= event_data->shadow_table;
2414       Field **field;
2415       /* unpack blob values */
2416       uchar* blobs_buffer= 0;
2417       uint blobs_buffer_size= 0;
2418       my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
2419       {
2420         ptrdiff_t ptrdiff= 0;
2421         int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
2422                                      blobs_buffer, blobs_buffer_size,
2423                                      ptrdiff);
2424         if (ret != 0)
2425         {
2426           my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
2427           DBUG_PRINT("info", ("blob read error"));
2428           assert(FALSE);
2429         }
2430       }
2431       /* db varchar 1 length uchar */
2432       field= table->field;
2433       db_length= *(uint8*)(*field)->ptr;
2434       assert(db_length <= (*field)->field_length);
2435       assert((*field)->field_length + 1 == sizeof(db));
2436       memcpy(db, (*field)->ptr + 1, db_length);
2437       db[db_length]= 0;
2438       /* name varchar 1 length uchar */
2439       field++;
2440       name_length= *(uint8*)(*field)->ptr;
2441       assert(name_length <= (*field)->field_length);
2442       assert((*field)->field_length + 1 == sizeof(name));
2443       memcpy(name, (*field)->ptr + 1, name_length);
2444       name[name_length]= 0;
2445       /* slock fixed length */
2446       field++;
2447       slock_length= (*field)->field_length;
2448       assert((*field)->field_length == sizeof(slock_buf));
2449       memcpy(slock_buf, (*field)->ptr, slock_length);
2450       /* query blob */
2451       field++;
2452       {
2453         Field_blob *field_blob= (Field_blob*)(*field);
2454         uint blob_len= field_blob->get_length((*field)->ptr);
2455         uchar *blob_ptr= 0;
2456         field_blob->get_ptr(&blob_ptr);
2457         assert(blob_len == 0 || blob_ptr != 0);
2458         query_length= blob_len;
2459         query= sql_strmake((char*) blob_ptr, blob_len);
2460       }
2461       /* node_id */
2462       field++;
2463       node_id= (Uint32)((Field_long *)*field)->val_int();
2464       /* epoch */
2465       field++;
2466       epoch= ((Field_long *)*field)->val_int();
2467       /* id */
2468       field++;
2469       id= (Uint32)((Field_long *)*field)->val_int();
2470       /* version */
2471       field++;
2472       version= (Uint32)((Field_long *)*field)->val_int();
2473       /* type */
2474       field++;
2475       type= (Uint32)((Field_long *)*field)->val_int();
2476       /* free blobs buffer */
2477       my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
2478       dbug_tmp_restore_column_map(table->read_set, old_map);
2479     }
2480 
2481   public:
2482     uchar db_length;
2483     char db[64];
2484     uchar name_length;
2485     char name[64];
2486     uchar slock_length;
2487     uint32 slock_buf[SCHEMA_SLOCK_SIZE/4];
2488     MY_BITMAP slock;
2489     unsigned short query_length;
2490     char *query;
2491     Uint64 epoch;
2492     uint32 node_id;
2493     uint32 id;
2494     uint32 version;
2495     uint32 type;
2496     uint32 any_value;
2497 
2498     /**
2499       Create a Ndb_schema_op from event_data
2500     */
2501     static Ndb_schema_op*
create(const Ndb_event_data * event_data,Uint32 any_value)2502     create(const Ndb_event_data* event_data,
2503            Uint32 any_value)
2504     {
2505       DBUG_ENTER("Ndb_schema_op::create");
2506       Ndb_schema_op* schema_op=
2507         (Ndb_schema_op*)sql_alloc(sizeof(Ndb_schema_op));
2508       bitmap_init(&schema_op->slock,
2509                   schema_op->slock_buf, 8*SCHEMA_SLOCK_SIZE, FALSE);
2510       schema_op->unpack_event(event_data);
2511       schema_op->any_value= any_value;
2512       DBUG_PRINT("exit", ("%s.%s: query: '%s'  type: %d",
2513                           schema_op->db, schema_op->name,
2514                           schema_op->query,
2515                           schema_op->type));
2516       DBUG_RETURN(schema_op);
2517     }
2518   };
2519 
2520   static void
print_could_not_discover_error(THD * thd,const Ndb_schema_op * schema)2521   print_could_not_discover_error(THD *thd,
2522                                  const Ndb_schema_op *schema)
2523   {
2524     sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
2525                     "binlog schema event '%s' from node %d. "
2526                     "my_errno: %d",
2527                     schema->db, schema->name, schema->query,
2528                     schema->node_id, my_errno());
2529     thd_print_warning_list(thd, "NDB Binlog");
2530   }
2531 
2532 
2533   static void
write_schema_op_to_binlog(THD * thd,Ndb_schema_op * schema)2534   write_schema_op_to_binlog(THD *thd, Ndb_schema_op *schema)
2535   {
2536 
2537     if (!ndb_binlog_running)
2538     {
2539       // This mysqld is not writing a binlog
2540       return;
2541     }
2542 
2543     /* any_value == 0 means local cluster sourced change that
2544      * should be logged
2545      */
2546     if (ndbcluster_anyvalue_is_reserved(schema->any_value))
2547     {
2548       /* Originating SQL node did not want this query logged */
2549       if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
2550         sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
2551                           "query not logged",
2552                           schema->any_value);
2553       return;
2554     }
2555 
2556     Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
2557     /*
2558        Start with serverId as received AnyValue, in case it's a composite
2559        (server_id_bits < 31).
2560        This is for 'future', as currently schema ops do not have composite
2561        AnyValues.
2562        In future it may be useful to support *not* mapping composite
2563        AnyValues to/from Binlogged server-ids.
2564     */
2565     Uint32 loggedServerId = schema->any_value;
2566 
2567     if (queryServerId)
2568     {
2569       /*
2570          AnyValue has non-zero serverId, must be a query applied by a slave
2571          mysqld.
2572          TODO : Assert that we are running in the Binlog injector thread?
2573       */
2574       if (! g_ndb_log_slave_updates)
2575       {
2576         /* This MySQLD does not log slave updates */
2577         return;
2578       }
2579     }
2580     else
2581     {
2582       /* No ServerId associated with this query, mark it as ours */
2583       ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
2584     }
2585 
2586     /*
2587       Write the DDL query to binlog with server_id set
2588       to the server_id where the query originated.
2589     */
2590     const uint32 thd_server_id_save= thd->server_id;
2591     assert(sizeof(thd_server_id_save) == sizeof(thd->server_id));
2592     thd->server_id = loggedServerId;
2593 
2594     LEX_CSTRING thd_db_save= thd->db();
2595     LEX_CSTRING schema_db_lex_cstr= {schema->db, strlen(schema->db)};
2596     thd->reset_db(schema_db_lex_cstr);
2597 
2598     int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
2599     thd->binlog_query(THD::STMT_QUERY_TYPE,
2600                       schema->query, schema->query_length,
2601                       false, // is_trans
2602                       true, // direct
2603                       schema->name[0] == 0 || thd->db().str[0] == 0,
2604                       errcode);
2605 
2606     // Commit the binlog write
2607     (void)trans_commit_stmt(thd);
2608 
2609     /*
2610       Restore original server_id and db after commit
2611       since the server_id is being used also in the commit logic
2612     */
2613     thd->server_id= thd_server_id_save;
2614     thd->reset_db(thd_db_save);
2615   }
2616 
2617 
2618 
2619   /*
2620     Acknowledge handling of schema operation
2621     - Inform the other nodes that schema op has
2622       been completed by this node (by updating the
2623       row for this op in ndb_schema table)
2624   */
2625   int
ack_schema_op(const char * db,const char * table_name,uint32 table_id,uint32 table_version)2626   ack_schema_op(const char *db, const char *table_name,
2627                 uint32 table_id, uint32 table_version)
2628   {
2629     DBUG_ENTER("ack_schema_op");
2630 
2631     const NdbError *ndb_error= 0;
2632     Ndb *ndb= check_ndb_in_thd(m_thd);
2633     char save_db[FN_HEADLEN];
2634     strcpy(save_db, ndb->getDatabaseName());
2635 
2636     char tmp_buf[FN_REFLEN];
2637     NDBDICT *dict= ndb->getDictionary();
2638     ndb->setDatabaseName(NDB_REP_DB);
2639     Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
2640     const NDBTAB *ndbtab= ndbtab_g.get_table();
2641     NdbTransaction *trans= 0;
2642     int retries= 100;
2643     int retry_sleep= 30; /* 30 milliseconds, transaction */
2644     const NDBCOL *col[SCHEMA_SIZE];
2645 
2646     MY_BITMAP slock;
2647     uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
2648     bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
2649 
2650     if (ndbtab == 0)
2651     {
2652       if (dict->getNdbError().code != 4009)
2653         abort();
2654       DBUG_RETURN(0);
2655     }
2656 
2657     {
2658       uint i;
2659       for (i= 0; i < SCHEMA_SIZE; i++)
2660       {
2661         col[i]= ndbtab->getColumn(i);
2662         if (i != SCHEMA_QUERY_I)
2663         {
2664           assert(col[i]->getLength() <= (int)sizeof(tmp_buf));
2665         }
2666       }
2667     }
2668 
2669     while (1)
2670     {
2671       if ((trans= ndb->startTransaction()) == 0)
2672         goto err;
2673       {
2674         NdbOperation *op= 0;
2675         int r= 0;
2676 
2677         /* read the bitmap exlusive */
2678         r|= (op= trans->getNdbOperation(ndbtab)) == 0;
2679         assert(r == 0);
2680         r|= op->readTupleExclusive();
2681         assert(r == 0);
2682 
2683         /* db */
2684         ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
2685         r|= op->equal(SCHEMA_DB_I, tmp_buf);
2686         assert(r == 0);
2687         /* name */
2688         ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
2689                          (int)strlen(table_name));
2690         r|= op->equal(SCHEMA_NAME_I, tmp_buf);
2691         assert(r == 0);
2692         /* slock */
2693         r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
2694         assert(r == 0);
2695       }
2696       if (trans->execute(NdbTransaction::NoCommit))
2697         goto err;
2698 
2699       if (opt_ndb_extra_logging > 19)
2700       {
2701         uint32 copy[SCHEMA_SLOCK_SIZE/4];
2702         memcpy(copy, bitbuf, sizeof(copy));
2703         bitmap_clear_bit(&slock, own_nodeid());
2704         sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
2705                               db, table_name,
2706                               table_id, table_version,
2707                               copy[0], copy[1],
2708                               slock.bitmap[0],
2709                               slock.bitmap[1]);
2710       }
2711       else
2712       {
2713         bitmap_clear_bit(&slock, own_nodeid());
2714       }
2715 
2716       {
2717         NdbOperation *op= 0;
2718         int r= 0;
2719 
2720         /* now update the tuple */
2721         r|= (op= trans->getNdbOperation(ndbtab)) == 0;
2722         assert(r == 0);
2723         r|= op->updateTuple();
2724         assert(r == 0);
2725 
2726         /* db */
2727         ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
2728         r|= op->equal(SCHEMA_DB_I, tmp_buf);
2729         assert(r == 0);
2730         /* name */
2731         ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
2732                          (int)strlen(table_name));
2733         r|= op->equal(SCHEMA_NAME_I, tmp_buf);
2734         assert(r == 0);
2735         /* slock */
2736         r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
2737         assert(r == 0);
2738         /* node_id */
2739         r|= op->setValue(SCHEMA_NODE_ID_I, own_nodeid());
2740         assert(r == 0);
2741         /* type */
2742         r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
2743         assert(r == 0);
2744       }
2745       if (trans->execute(NdbTransaction::Commit,
2746                          NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
2747       {
2748         DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
2749                             own_nodeid(), db, table_name));
2750         dict->forceGCPWait(1);
2751         break;
2752       }
2753     err:
2754       const NdbError *this_error= trans ?
2755         &trans->getNdbError() : &ndb->getNdbError();
2756       if (this_error->status == NdbError::TemporaryError &&
2757           !thd_killed(m_thd))
2758       {
2759         if (retries--)
2760         {
2761           if (trans)
2762             ndb->closeTransaction(trans);
2763           do_retry_sleep(retry_sleep);
2764           continue; // retry
2765         }
2766       }
2767       ndb_error= this_error;
2768       break;
2769     }
2770 
2771     if (ndb_error)
2772     {
2773       sql_print_warning("NDB: Could not release slock on '%s.%s', "
2774                         "Error code: %d Message: %s",
2775                         db, table_name,
2776                         ndb_error->code, ndb_error->message);
2777     }
2778     if (trans)
2779       ndb->closeTransaction(trans);
2780     ndb->setDatabaseName(save_db);
2781     DBUG_RETURN(0);
2782   }
2783 
2784 
check_is_ndb_schema_event(const Ndb_event_data * event_data) const2785   bool check_is_ndb_schema_event(const Ndb_event_data* event_data) const
2786   {
2787     if (!event_data)
2788     {
2789       // Received event without event data pointer
2790       assert(false);
2791       return false;
2792     }
2793 
2794     NDB_SHARE *share= event_data->share;
2795     if (!share)
2796     {
2797       // Received event where the event_data is not properly initialized
2798       assert(false);
2799       return false;
2800     }
2801     assert(event_data->shadow_table);
2802     assert(event_data->ndb_value[0]);
2803     assert(event_data->ndb_value[1]);
2804 
2805     native_mutex_lock(&ndb_schema_share_mutex);
2806     if (share != ndb_schema_share)
2807     {
2808       // Received event from s_ndb not pointing at the ndb_schema_share
2809       native_mutex_unlock(&ndb_schema_share_mutex);
2810       assert(false);
2811       return false;
2812     }
2813     assert(!strncmp(share->db, STRING_WITH_LEN(NDB_REP_DB)));
2814     assert(!strncmp(share->table_name, STRING_WITH_LEN(NDB_SCHEMA_TABLE)));
2815     native_mutex_unlock(&ndb_schema_share_mutex);
2816     return true;
2817   }
2818 
2819 
2820   void
handle_after_epoch(Ndb_schema_op * schema)2821   handle_after_epoch(Ndb_schema_op* schema)
2822   {
2823     DBUG_ENTER("handle_after_epoch");
2824     DBUG_PRINT("info", ("Pushing Ndb_schema_op on list to be "
2825                         "handled after epoch"));
2826     assert(!is_post_epoch()); // Only before epoch
2827     m_post_epoch_handle_list.push_back(schema, m_mem_root);
2828     DBUG_VOID_RETURN;
2829   }
2830 
2831 
2832   void
ack_after_epoch(Ndb_schema_op * schema)2833   ack_after_epoch(Ndb_schema_op* schema)
2834   {
2835     DBUG_ENTER("ack_after_epoch");
2836     assert(!is_post_epoch()); // Only before epoch
2837     m_post_epoch_ack_list.push_back(schema, m_mem_root);
2838     DBUG_VOID_RETURN;
2839   }
2840 
2841 
own_nodeid(void) const2842   uint own_nodeid(void) const
2843   {
2844     return m_own_nodeid;
2845   }
2846 
2847 
2848   void
ndbapi_invalidate_table(const char * db_name,const char * table_name) const2849   ndbapi_invalidate_table(const char* db_name, const char* table_name) const
2850   {
2851     DBUG_ENTER("ndbapi_invalidate_table");
2852     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
2853     Ndb *ndb= thd_ndb->ndb;
2854 
2855     ndb->setDatabaseName(db_name);
2856     Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
2857     ndbtab_g.invalidate();
2858     DBUG_VOID_RETURN;
2859   }
2860 
2861 
2862   void
mysqld_close_cached_table(const char * db_name,const char * table_name) const2863   mysqld_close_cached_table(const char* db_name, const char* table_name) const
2864   {
2865     DBUG_ENTER("mysqld_close_cached_table");
2866      // Just mark table as "need reopen"
2867     const bool wait_for_refresh = false;
2868     // Not waiting -> no timeout needed
2869     const ulong timeout = 0;
2870 
2871     TABLE_LIST table_list;
2872     memset(&table_list, 0, sizeof(table_list));
2873     table_list.db= (char*)db_name;
2874     table_list.alias= table_list.table_name= (char*)table_name;
2875 
2876     close_cached_tables(m_thd, &table_list,
2877                         wait_for_refresh, timeout);
2878     DBUG_VOID_RETURN;
2879   }
2880 
2881 
2882   void
mysqld_write_frm_from_ndb(const char * db_name,const char * table_name) const2883   mysqld_write_frm_from_ndb(const char* db_name,
2884                             const char* table_name) const
2885   {
2886     DBUG_ENTER("mysqld_write_frm_from_ndb");
2887     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
2888     Ndb *ndb= thd_ndb->ndb;
2889     Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
2890     const NDBTAB *ndbtab= ndbtab_g.get_table();
2891     if (!ndbtab)
2892     {
2893       /*
2894         Bug#14773491 reports crash in 'cmp_frm' due to
2895         ndbtab* being NULL -> bail out here
2896       */
2897       sql_print_error("NDB schema: Could not find table '%s.%s' in NDB",
2898                       db_name, table_name);
2899       assert(false);
2900       DBUG_VOID_RETURN;
2901     }
2902 
2903     char key[FN_REFLEN];
2904     build_table_filename(key, sizeof(key)-1,
2905                          db_name, table_name, NullS, 0);
2906 
2907     uchar *data= 0, *pack_data= 0;
2908     size_t length, pack_length;
2909 
2910     if (readfrm(key, &data, &length) == 0 &&
2911         packfrm(data, length, &pack_data, &pack_length) == 0 &&
2912         cmp_frm(ndbtab, pack_data, pack_length))
2913     {
2914       DBUG_PRINT("info", ("Detected frm change of table %s.%s",
2915                           db_name, table_name));
2916 
2917       DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
2918                         ndbtab->getFrmLength());
2919       my_free(data);
2920       data= NULL;
2921 
2922       int error;
2923       if ((error= unpackfrm(&data, &length,
2924                             (const uchar*) ndbtab->getFrmData())) ||
2925           (error= writefrm(key, data, length)))
2926       {
2927         sql_print_error("NDB: Failed write frm for %s.%s, error %d",
2928                         db_name, table_name, error);
2929       }
2930     }
2931     my_free(data);
2932     my_free(pack_data);
2933     DBUG_VOID_RETURN;
2934   }
2935 
2936 
get_share(Ndb_schema_op * schema) const2937   NDB_SHARE* get_share(Ndb_schema_op* schema) const
2938   {
2939     DBUG_ENTER("get_share(Ndb_schema_op*)");
2940     char key[FN_REFLEN + 1];
2941     build_table_filename(key, sizeof(key) - 1,
2942                          schema->db, schema->name, "", 0);
2943     NDB_SHARE *share= ndbcluster_get_share(key, 0, FALSE, FALSE);
2944     if (share)
2945     {
2946       DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
2947                                share->key_string(), share->use_count));
2948     }
2949     DBUG_RETURN(share);
2950   }
2951 
2952 
2953   bool
check_if_local_tables_in_db(const char * dbname) const2954   check_if_local_tables_in_db(const char *dbname) const
2955   {
2956     DBUG_ENTER("check_if_local_tables_in_db");
2957     DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
2958     Ndb_find_files_list files(m_thd);
2959     char path[FN_REFLEN + 1];
2960     build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
2961     if (!files.find_tables(dbname, path))
2962     {
2963       m_thd->clear_error();
2964       DBUG_PRINT("info", ("Failed to find files"));
2965       DBUG_RETURN(true);
2966     }
2967     DBUG_PRINT("info",("found: %d files", files.found_files()));
2968 
2969     LEX_STRING *tabname;
2970     while ((tabname= files.next()))
2971     {
2972       DBUG_PRINT("info", ("Found table %s", tabname->str));
2973       if (ndbcluster_check_if_local_table(dbname, tabname->str))
2974         DBUG_RETURN(true);
2975     }
2976 
2977     DBUG_RETURN(false);
2978   }
2979 
2980 
is_local_table(const char * db_name,const char * table_name) const2981   bool is_local_table(const char* db_name, const char* table_name) const
2982   {
2983     return ndbcluster_check_if_local_table(db_name, table_name);
2984   }
2985 
2986 
handle_clear_slock(Ndb_schema_op * schema)2987   void handle_clear_slock(Ndb_schema_op* schema)
2988   {
2989     DBUG_ENTER("handle_clear_slock");
2990 
2991     assert(is_post_epoch());
2992 
2993     char key[FN_REFLEN + 1];
2994     build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2995 
2996     /* Ack to any SQL thread waiting for schema op to complete */
2997     NDB_SCHEMA_OBJECT *ndb_schema_object= ndb_get_schema_object(key, false);
2998     if (!ndb_schema_object)
2999     {
3000       /* Noone waiting for this schema op in this mysqld */
3001       if (opt_ndb_extra_logging > 19)
3002         sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
3003                               key, schema->id, schema->version);
3004       DBUG_VOID_RETURN;
3005     }
3006 
3007     if (ndb_schema_object->table_id != schema->id ||
3008         ndb_schema_object->table_version != schema->version)
3009     {
3010       /* Someone waiting, but for another id/version... */
3011       if (opt_ndb_extra_logging > 19)
3012         sql_print_information("NDB: Discarding event...key: %s "
3013                               "non matching id/version [%u/%u] != [%u/%u]",
3014                               key,
3015                               ndb_schema_object->table_id,
3016                               ndb_schema_object->table_version,
3017                               schema->id,
3018                               schema->version);
3019       ndb_free_schema_object(&ndb_schema_object);
3020       DBUG_VOID_RETURN;
3021     }
3022 
3023     // Build bitmask of subscribers
3024     MY_BITMAP servers;
3025     bitmap_init(&servers, 0, 256, FALSE);
3026     bitmap_clear_all(&servers);
3027     bitmap_set_bit(&servers, own_nodeid()); // "we" are always alive
3028     m_schema_dist_data.get_subscriber_bitmask(&servers);
3029 
3030     /*
3031       Copy the latest slock info into the ndb_schema_object so that
3032       waiter can check if all nodes it's waiting for has answered
3033     */
3034     native_mutex_lock(&ndb_schema_object->mutex);
3035     if (opt_ndb_extra_logging > 19)
3036     {
3037       sql_print_information("NDB: CLEAR_SLOCK key: %s(%u/%u) from"
3038                             " %x%x to %x%x",
3039                             key, schema->id, schema->version,
3040                             ndb_schema_object->slock[0],
3041                             ndb_schema_object->slock[1],
3042                             schema->slock_buf[0],
3043                             schema->slock_buf[1]);
3044     }
3045     memcpy(ndb_schema_object->slock, schema->slock_buf,
3046            sizeof(ndb_schema_object->slock));
3047     DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
3048               (uchar*)ndb_schema_object->slock_bitmap.bitmap,
3049               no_bytes_in_map(&ndb_schema_object->slock_bitmap));
3050 
3051     /* remove any unsubscribed from ndb_schema_object->slock */
3052     bitmap_intersect(&ndb_schema_object->slock_bitmap, &servers);
3053     bitmap_free(&servers);
3054 
3055     DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
3056               (uchar*)ndb_schema_object->slock_bitmap.bitmap,
3057               no_bytes_in_map(&ndb_schema_object->slock_bitmap));
3058     native_mutex_unlock(&ndb_schema_object->mutex);
3059 
3060     ndb_free_schema_object(&ndb_schema_object);
3061 
3062     /* Wake up the waiter */
3063     native_cond_signal(&injector_cond);
3064 
3065     DBUG_VOID_RETURN;
3066   }
3067 
3068 
3069   void
handle_offline_alter_table_commit(Ndb_schema_op * schema)3070   handle_offline_alter_table_commit(Ndb_schema_op* schema)
3071   {
3072     DBUG_ENTER("handle_offline_alter_table_commit");
3073 
3074     assert(is_post_epoch()); // Always after epoch
3075 
3076     if (schema->node_id == own_nodeid())
3077       DBUG_VOID_RETURN;
3078 
3079     write_schema_op_to_binlog(m_thd, schema);
3080     ndbapi_invalidate_table(schema->db, schema->name);
3081     mysqld_close_cached_table(schema->db, schema->name);
3082 
3083     /**
3084      * Note about get_share() / free_share() referrences:
3085      *
3086      *  1) All shares have a ref count related to their 'discovery' by dictionary.
3087      *     (Until they are 'dropped')
3088      *  2) All shares are referred by the binlog thread if its DDL operations
3089      *     should be replicated with schema events ('share->op != NULL')
3090      *  3) All shares are ref counted when they are temporarily referred
3091      *     inside a function. (as below)
3092      */
3093     NDB_SHARE *share= get_share(schema);  // 3) Temporary pin 'share'
3094     if (share)
3095     {
3096       native_mutex_lock(&share->mutex);
3097       if (share->op)
3098       {
3099         Ndb_event_data *event_data=
3100           (Ndb_event_data *) share->op->getCustomData();
3101         if (event_data)
3102           delete event_data;
3103         share->op->setCustomData(NULL);
3104         {
3105           Mutex_guard injector_mutex_g(injector_mutex);
3106           injector_ndb->dropEventOperation(share->op);
3107         }
3108         share->op= 0;
3109         free_share(&share);   // Free binlog ref, 2)
3110         assert(share);   // Still ref'ed by 1) & 3)
3111       }
3112       native_mutex_unlock(&share->mutex);
3113       free_share(&share);   // Free temporary ref, 3)
3114       assert(share);   // Still ref'ed by dict, 1)
3115 
3116       /**
3117        * Finaly unref. from dictionary, 1).
3118        * If this was the last share ref, it will be deleted.
3119        * If there are more (trailing) references, the share will remain as an
3120        * unvisible instance in the share-hash until remaining references are dropped.
3121        */
3122       native_mutex_lock(&ndbcluster_mutex);
3123       handle_trailing_share(m_thd, share); // Unref my 'share', and make any pending refs 'trailing'
3124       share= 0;                            // It's gone
3125       native_mutex_unlock(&ndbcluster_mutex);
3126     } // if (share)
3127 
3128     if (is_local_table(schema->db, schema->name) &&
3129        !Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
3130                                                       schema->name))
3131     {
3132       sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' "
3133                       "from binlog schema event '%s' from node %d.",
3134                       schema->db, schema->name, schema->query,
3135                       schema->node_id);
3136       DBUG_VOID_RETURN;
3137     }
3138 
3139     // Instantiate a new 'share' for the altered table.
3140     if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
3141     {
3142       print_could_not_discover_error(m_thd, schema);
3143     }
3144     DBUG_VOID_RETURN;
3145   }
3146 
3147 
3148   void
handle_online_alter_table_prepare(Ndb_schema_op * schema)3149   handle_online_alter_table_prepare(Ndb_schema_op* schema)
3150   {
3151     assert(is_post_epoch()); // Always after epoch
3152 
3153     ndbapi_invalidate_table(schema->db, schema->name);
3154     mysqld_close_cached_table(schema->db, schema->name);
3155 
3156     if (schema->node_id != own_nodeid())
3157     {
3158       write_schema_op_to_binlog(m_thd, schema);
3159       if (!is_local_table(schema->db, schema->name))
3160       {
3161         mysqld_write_frm_from_ndb(schema->db, schema->name);
3162       }
3163     }
3164   }
3165 
3166 
3167   void
handle_online_alter_table_commit(Ndb_schema_op * schema)3168   handle_online_alter_table_commit(Ndb_schema_op* schema)
3169   {
3170     assert(is_post_epoch()); // Always after epoch
3171 
3172     NDB_SHARE *share= get_share(schema);
3173     if (share)
3174     {
3175       if (opt_ndb_extra_logging > 9)
3176         sql_print_information("NDB Binlog: handling online alter/rename");
3177 
3178       native_mutex_lock(&share->mutex);
3179       ndb_binlog_close_shadow_table(share);
3180 
3181       if (ndb_binlog_open_shadow_table(m_thd, share))
3182       {
3183         sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s",
3184                         schema->db, schema->name);
3185         native_mutex_unlock(&share->mutex);
3186       }
3187       else
3188       {
3189         /*
3190           Start subscribing to data changes to the new table definition
3191         */
3192         String event_name(INJECTOR_EVENT_LEN);
3193         ndb_rep_event_name(&event_name, schema->db, schema->name,
3194                            get_binlog_full(share));
3195         NdbEventOperation *tmp_op= share->op;
3196         share->new_op= 0;
3197         share->op= 0;
3198 
3199         Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3200         Ndb *ndb= thd_ndb->ndb;
3201         Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
3202         const NDBTAB *ndbtab= ndbtab_g.get_table();
3203         if (ndbcluster_create_event_ops(m_thd, share, ndbtab,
3204                                         event_name.c_ptr()))
3205         {
3206           sql_print_error("NDB Binlog:"
3207                           "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
3208                           event_name.c_ptr());
3209         }
3210         else
3211         {
3212           share->new_op= share->op;
3213         }
3214         share->op= tmp_op;
3215         native_mutex_unlock(&share->mutex);
3216 
3217         if (opt_ndb_extra_logging > 9)
3218           sql_print_information("NDB Binlog: handling online "
3219                                 "alter/rename done");
3220       }
3221       native_mutex_lock(&share->mutex);
3222       if (share->op && share->new_op)
3223       {
3224         Ndb_event_data *event_data=
3225           (Ndb_event_data *) share->op->getCustomData();
3226         if (event_data)
3227           delete event_data;
3228         share->op->setCustomData(NULL);
3229         {
3230           Mutex_guard injector_mutex_g(injector_mutex);
3231           injector_ndb->dropEventOperation(share->op);
3232         }
3233         share->op= share->new_op;
3234         share->new_op= 0;
3235         free_share(&share);
3236         assert(share);   // Should still be ref'ed
3237       }
3238       native_mutex_unlock(&share->mutex);
3239 
3240       free_share(&share);
3241     }
3242   }
3243 
3244 
3245   void
handle_drop_table(Ndb_schema_op * schema)3246   handle_drop_table(Ndb_schema_op* schema)
3247   {
3248     DBUG_ENTER("handle_drop_table");
3249 
3250     assert(is_post_epoch()); // Always after epoch
3251 
3252     if (schema->node_id == own_nodeid())
3253       DBUG_VOID_RETURN;
3254 
3255     write_schema_op_to_binlog(m_thd, schema);
3256 
3257     Ndb_local_schema::Table tab(m_thd, schema->db, schema->name);
3258     if (tab.is_local_table())
3259     {
3260       /* Table is not a NDB table in this mysqld -> leave it */
3261       sql_print_error("NDB Binlog: Skipping drop of locally "
3262                       "defined table '%s.%s' from binlog schema "
3263                       "event '%s' from node %d. ",
3264                       schema->db, schema->name, schema->query,
3265                       schema->node_id);
3266 
3267       // There should be no NDB_SHARE for this table
3268       assert(!get_share(schema));
3269 
3270       DBUG_VOID_RETURN;
3271     }
3272 
3273     tab.remove_table();
3274 
3275     NDB_SHARE *share= get_share(schema); // temporary ref.
3276     if (!share || !share->op)
3277     {
3278       ndbapi_invalidate_table(schema->db, schema->name);
3279       mysqld_close_cached_table(schema->db, schema->name);
3280     }
3281     if (share)
3282     {
3283       free_share(&share); // temporary ref.
3284       assert(share); // Should still be ref'ed
3285       free_share(&share); // server ref.
3286     }
3287 
3288     ndbapi_invalidate_table(schema->db, schema->name);
3289     mysqld_close_cached_table(schema->db, schema->name);
3290 
3291     DBUG_VOID_RETURN;
3292   }
3293 
3294 
3295   /*
3296     The RENAME is performed in two steps.
3297     1) PREPARE_RENAME - sends the new table key to participants
3298     2) RENAME - perform the actual rename
3299   */
3300 
3301   void
handle_rename_table_prepare(Ndb_schema_op * schema)3302   handle_rename_table_prepare(Ndb_schema_op* schema)
3303   {
3304     DBUG_ENTER("handle_rename_table_prepare");
3305 
3306     assert(is_post_epoch()); // Always after epoch
3307 
3308     if (schema->node_id == own_nodeid())
3309       DBUG_VOID_RETURN;
3310 
3311     const char* new_key_for_table= schema->query;
3312     DBUG_PRINT("info", ("new_key_for_table: '%s'", new_key_for_table));
3313 
3314     // Release potentially previously prepared new_key
3315     {
3316       NDB_SHARE_KEY* old_prepared_key =
3317           m_schema_dist_data.get_prepared_rename_key();
3318       if (old_prepared_key)
3319         NDB_SHARE::free_key(old_prepared_key);
3320     }
3321 
3322     // Create a new key save it, then hope for the best(i.e
3323     // that it can be found later when the RENAME arrives)
3324     NDB_SHARE_KEY* new_prepared_key =
3325         NDB_SHARE::create_key(new_key_for_table);
3326     m_schema_dist_data.save_prepared_rename_key(new_prepared_key);
3327 
3328     DBUG_VOID_RETURN;
3329   }
3330 
3331 
3332   void
handle_rename_table(Ndb_schema_op * schema)3333   handle_rename_table(Ndb_schema_op* schema)
3334   {
3335     DBUG_ENTER("handle_rename_table");
3336 
3337     assert(is_post_epoch()); // Always after epoch
3338 
3339     if (schema->node_id == own_nodeid())
3340       DBUG_VOID_RETURN;
3341 
3342     write_schema_op_to_binlog(m_thd, schema);
3343 
3344     Ndb_local_schema::Table from(m_thd, schema->db, schema->name);
3345     if (from.is_local_table())
3346     {
3347       /* Tables exists as a local table, print error and leave it */
3348       sql_print_error("NDB Binlog: Skipping renaming locally "
3349                       "defined table '%s.%s' from binlog schema "
3350                       "event '%s' from node %d. ",
3351                       schema->db, schema->name, schema->query,
3352                       schema->node_id);
3353       DBUG_VOID_RETURN;
3354     }
3355 
3356     NDB_SHARE *share= get_share(schema); // temporary ref.
3357     if (!share || !share->op)
3358     {
3359       ndbapi_invalidate_table(schema->db, schema->name);
3360       mysqld_close_cached_table(schema->db, schema->name);
3361     }
3362     if (share)
3363       free_share(&share);  // temporary ref.
3364 
3365     share= get_share(schema);  // temporary ref.
3366     if (!share)
3367     {
3368       // The RENAME need to find share so it can be renamed
3369       assert(share);
3370       DBUG_VOID_RETURN;
3371     }
3372 
3373     NDB_SHARE_KEY* prepared_key =
3374         m_schema_dist_data.get_prepared_rename_key();
3375     if (!prepared_key)
3376     {
3377       // The rename need to have new_key set
3378       // by a previous RENAME_PREPARE
3379       assert(prepared_key);
3380       DBUG_VOID_RETURN;
3381     }
3382 
3383     // Rename on participant is always from real to
3384     // real name(i.e neiher old or new name should be a temporary name)
3385     assert(!IS_TMP_PREFIX(schema->name));
3386     assert(!IS_TMP_PREFIX(NDB_SHARE::key_get_table_name(prepared_key)));
3387 
3388     // Rename the local table
3389     from.rename_table(NDB_SHARE::key_get_db_name(prepared_key),
3390                       NDB_SHARE::key_get_table_name(prepared_key));
3391 
3392     // Rename share and release the old key
3393     NDB_SHARE_KEY* old_key = share->key;
3394     ndbcluster_rename_share(m_thd, share, prepared_key);
3395     m_schema_dist_data.save_prepared_rename_key(NULL);
3396     NDB_SHARE::free_key(old_key);
3397 
3398     free_share(&share);  // temporary ref.
3399 
3400     ndbapi_invalidate_table(schema->db, schema->name);
3401     mysqld_close_cached_table(schema->db, schema->name);
3402 
3403     DBUG_VOID_RETURN;
3404   }
3405 
3406 
3407   void
handle_drop_db(Ndb_schema_op * schema)3408   handle_drop_db(Ndb_schema_op* schema)
3409   {
3410     DBUG_ENTER("handle_drop_db");
3411 
3412     assert(is_post_epoch()); // Always after epoch
3413 
3414     if (schema->node_id == own_nodeid())
3415       DBUG_VOID_RETURN;
3416 
3417     write_schema_op_to_binlog(m_thd, schema);
3418 
3419     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3420     Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3421     // Set NO_LOCK_SCHEMA_OP before 'check_if_local_tables_indb'
3422     // until ndbcluster_find_files does not take GSL
3423     thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3424 
3425     if (check_if_local_tables_in_db(schema->db))
3426     {
3427       /* Tables exists as a local table, print error and leave it */
3428       sql_print_error("NDB Binlog: Skipping drop database '%s' since "
3429                       "it contained local tables "
3430                       "binlog schema event '%s' from node %d. ",
3431                       schema->db, schema->query,
3432                       schema->node_id);
3433       DBUG_VOID_RETURN;
3434     }
3435 
3436     const int no_print_error[1]= {0};
3437     run_query(m_thd, schema->query,
3438               schema->query + schema->query_length,
3439               no_print_error);
3440 
3441     DBUG_VOID_RETURN;
3442   }
3443 
3444 
3445   void
handle_truncate_table(Ndb_schema_op * schema)3446   handle_truncate_table(Ndb_schema_op* schema)
3447   {
3448     DBUG_ENTER("handle_truncate_table");
3449 
3450     assert(!is_post_epoch()); // Always directly
3451 
3452     if (schema->node_id == own_nodeid())
3453       DBUG_VOID_RETURN;
3454 
3455     write_schema_op_to_binlog(m_thd, schema);
3456 
3457     NDB_SHARE *share= get_share(schema);
3458     // invalidation already handled by binlog thread
3459     if (!share || !share->op)
3460     {
3461       ndbapi_invalidate_table(schema->db, schema->name);
3462       mysqld_close_cached_table(schema->db, schema->name);
3463     }
3464     if (share)
3465       free_share(&share);
3466 
3467     if (is_local_table(schema->db, schema->name))
3468     {
3469       sql_print_error("NDB Binlog: Skipping locally defined table "
3470                       "'%s.%s' from binlog schema event '%s' from "
3471                       "node %d. ",
3472                       schema->db, schema->name, schema->query,
3473                       schema->node_id);
3474       DBUG_VOID_RETURN;
3475     }
3476 
3477     if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
3478     {
3479       print_could_not_discover_error(m_thd, schema);
3480     }
3481 
3482     DBUG_VOID_RETURN;
3483   }
3484 
3485 
3486   void
handle_create_table(Ndb_schema_op * schema)3487   handle_create_table(Ndb_schema_op* schema)
3488   {
3489     DBUG_ENTER("handle_create_table");
3490 
3491     assert(!is_post_epoch()); // Always directly
3492 
3493     if (schema->node_id == own_nodeid())
3494       DBUG_VOID_RETURN;
3495 
3496     write_schema_op_to_binlog(m_thd, schema);
3497 
3498     if (is_local_table(schema->db, schema->name))
3499     {
3500       sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
3501                           "binlog schema event '%s' from node %d. ",
3502                           schema->db, schema->name, schema->query,
3503                           schema->node_id);
3504       DBUG_VOID_RETURN;
3505     }
3506 
3507     if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
3508     {
3509       print_could_not_discover_error(m_thd, schema);
3510     }
3511 
3512     DBUG_VOID_RETURN;
3513   }
3514 
3515 
3516   void
handle_create_db(Ndb_schema_op * schema)3517   handle_create_db(Ndb_schema_op* schema)
3518   {
3519     DBUG_ENTER("handle_create_db");
3520 
3521     assert(!is_post_epoch()); // Always directly
3522 
3523     if (schema->node_id == own_nodeid())
3524       DBUG_VOID_RETURN;
3525 
3526     write_schema_op_to_binlog(m_thd, schema);
3527 
3528     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3529     Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3530     thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3531     const int no_print_error[1]= {0};
3532     run_query(m_thd, schema->query,
3533               schema->query + schema->query_length,
3534               no_print_error);
3535 
3536     DBUG_VOID_RETURN;
3537   }
3538 
3539 
3540   void
handle_alter_db(Ndb_schema_op * schema)3541   handle_alter_db(Ndb_schema_op* schema)
3542   {
3543     DBUG_ENTER("handle_alter_db");
3544 
3545     assert(!is_post_epoch()); // Always directly
3546 
3547     if (schema->node_id == own_nodeid())
3548       DBUG_VOID_RETURN;
3549 
3550     write_schema_op_to_binlog(m_thd, schema);
3551 
3552     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3553     Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3554     thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3555     const int no_print_error[1]= {0};
3556     run_query(m_thd, schema->query,
3557               schema->query + schema->query_length,
3558               no_print_error);
3559 
3560     DBUG_VOID_RETURN;
3561   }
3562 
3563 
3564   void
handle_grant_op(Ndb_schema_op * schema)3565   handle_grant_op(Ndb_schema_op* schema)
3566   {
3567     DBUG_ENTER("handle_grant_op");
3568 
3569     assert(!is_post_epoch()); // Always directly
3570 
3571     if (schema->node_id == own_nodeid())
3572       DBUG_VOID_RETURN;
3573 
3574     write_schema_op_to_binlog(m_thd, schema);
3575 
3576     if (opt_ndb_extra_logging > 9)
3577       sql_print_information("Got dist_priv event: %s, "
3578                             "flushing privileges",
3579                             get_schema_type_name(schema->type));
3580 
3581     Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3582     Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3583     thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3584     const int no_print_error[1]= {0};
3585     char *cmd= (char *) "flush privileges";
3586     run_query(m_thd, cmd,
3587               cmd + strlen(cmd),
3588               no_print_error);
3589 
3590     DBUG_VOID_RETURN;
3591   }
3592 
3593 
3594   int
handle_schema_op(Ndb_schema_op * schema)3595   handle_schema_op(Ndb_schema_op* schema)
3596   {
3597     DBUG_ENTER("handle_schema_op");
3598     {
3599       const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
3600 
3601       if (opt_ndb_extra_logging > 19)
3602       {
3603         sql_print_information("NDB: got schema event on %s.%s(%u/%u) query: '%s' type: %s(%d) node: %u slock: %x%x",
3604                               schema->db, schema->name,
3605                               schema->id, schema->version,
3606                               schema->query,
3607                               get_schema_type_name(schema_type),
3608                               schema_type,
3609                               schema->node_id,
3610                               schema->slock.bitmap[0],
3611                               schema->slock.bitmap[1]);
3612       }
3613 
3614       if ((schema->db[0] == 0) && (schema->name[0] == 0))
3615       {
3616         /**
3617          * This happens if there is a schema event on a table (object)
3618          *   that this mysqld does not know about.
3619          *   E.g it had a local table shadowing a ndb table...
3620          */
3621         DBUG_RETURN(0);
3622       }
3623 
3624       switch (schema_type)
3625       {
3626       case SOT_CLEAR_SLOCK:
3627         /*
3628           handle slock after epoch is completed to ensure that
3629           schema events get inserted in the binlog after any data
3630           events
3631         */
3632         handle_after_epoch(schema);
3633         DBUG_RETURN(0);
3634 
3635       case SOT_ALTER_TABLE_COMMIT:
3636       case SOT_RENAME_TABLE_PREPARE:
3637       case SOT_ONLINE_ALTER_TABLE_PREPARE:
3638       case SOT_ONLINE_ALTER_TABLE_COMMIT:
3639       case SOT_RENAME_TABLE:
3640       case SOT_DROP_TABLE:
3641       case SOT_DROP_DB:
3642         handle_after_epoch(schema);
3643         ack_after_epoch(schema);
3644         DBUG_RETURN(0);
3645 
3646       case SOT_TRUNCATE_TABLE:
3647         handle_truncate_table(schema);
3648         break;
3649 
3650       case SOT_CREATE_TABLE:
3651         handle_create_table(schema);
3652         break;
3653 
3654       case SOT_CREATE_DB:
3655         handle_create_db(schema);
3656         break;
3657 
3658       case SOT_ALTER_DB:
3659         handle_alter_db(schema);
3660         break;
3661 
3662       case SOT_CREATE_USER:
3663       case SOT_DROP_USER:
3664       case SOT_RENAME_USER:
3665       case SOT_GRANT:
3666       case SOT_REVOKE:
3667         handle_grant_op(schema);
3668         break;
3669 
3670       case SOT_TABLESPACE:
3671       case SOT_LOGFILE_GROUP:
3672         if (schema->node_id == own_nodeid())
3673           break;
3674         write_schema_op_to_binlog(m_thd, schema);
3675         break;
3676 
3677       case SOT_RENAME_TABLE_NEW:
3678         /*
3679           Only very old MySQL Server connected to the cluster may
3680           send this schema operation, ignore it
3681         */
3682         sql_print_error("NDB schema: Skipping old schema operation"
3683                         "(RENAME_TABLE_NEW) on %s.%s",
3684                         schema->db, schema->name);
3685         assert(false);
3686         break;
3687 
3688       }
3689 
3690       /* signal that schema operation has been handled */
3691       DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
3692       if (bitmap_is_set(&schema->slock, own_nodeid()))
3693       {
3694         ack_schema_op(schema->db, schema->name,
3695                       schema->id, schema->version);
3696       }
3697     }
3698     DBUG_RETURN(0);
3699   }
3700 
3701 
3702   void
handle_schema_op_post_epoch(Ndb_schema_op * schema)3703   handle_schema_op_post_epoch(Ndb_schema_op* schema)
3704   {
3705     DBUG_ENTER("handle_schema_op_post_epoch");
3706     DBUG_PRINT("enter", ("%s.%s: query: '%s'  type: %d",
3707                          schema->db, schema->name,
3708                          schema->query, schema->type));
3709 
3710     {
3711       const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
3712       if (opt_ndb_extra_logging > 9)
3713         sql_print_information("%s - %s.%s",
3714                               get_schema_type_name(schema_type),
3715                               schema->db ? schema->db : "(null)",
3716                               schema->name ? schema->name : "(null)");
3717 
3718       switch (schema_type)
3719       {
3720       case SOT_CLEAR_SLOCK:
3721         handle_clear_slock(schema);
3722         break;
3723 
3724       case SOT_DROP_DB:
3725         handle_drop_db(schema);
3726         break;
3727 
3728       case SOT_DROP_TABLE:
3729         handle_drop_table(schema);
3730         break;
3731 
3732       case SOT_RENAME_TABLE_PREPARE:
3733         handle_rename_table_prepare(schema);
3734         break;
3735 
3736       case SOT_RENAME_TABLE:
3737         handle_rename_table(schema);
3738         break;
3739 
3740       case SOT_ALTER_TABLE_COMMIT:
3741         handle_offline_alter_table_commit(schema);
3742         break;
3743 
3744       case SOT_ONLINE_ALTER_TABLE_PREPARE:
3745         handle_online_alter_table_prepare(schema);
3746         break;
3747 
3748       case SOT_ONLINE_ALTER_TABLE_COMMIT:
3749         handle_online_alter_table_commit(schema);
3750         break;
3751 
3752       default:
3753         assert(FALSE);
3754       }
3755     }
3756 
3757     DBUG_VOID_RETURN;
3758   }
3759 
3760   THD* m_thd;
3761   MEM_ROOT* m_mem_root;
3762   uint m_own_nodeid;
3763   Ndb_schema_dist_data& m_schema_dist_data;
3764   bool m_post_epoch;
3765 
is_post_epoch(void) const3766   bool is_post_epoch(void) const { return m_post_epoch; }
3767 
3768   List<Ndb_schema_op> m_post_epoch_handle_list;
3769   List<Ndb_schema_op> m_post_epoch_ack_list;
3770 
3771 public:
3772   Ndb_schema_event_handler(); // Not implemented
3773   Ndb_schema_event_handler(const Ndb_schema_event_handler&); // Not implemented
3774 
Ndb_schema_event_handler(THD * thd,MEM_ROOT * mem_root,uint own_nodeid,Ndb_schema_dist_data & schema_dist_data)3775   Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root, uint own_nodeid,
3776                            Ndb_schema_dist_data& schema_dist_data):
3777     m_thd(thd), m_mem_root(mem_root), m_own_nodeid(own_nodeid),
3778     m_schema_dist_data(schema_dist_data),
3779     m_post_epoch(false)
3780   {
3781   }
3782 
3783 
~Ndb_schema_event_handler()3784   ~Ndb_schema_event_handler()
3785   {
3786     // There should be no work left todo...
3787     assert(m_post_epoch_handle_list.elements == 0);
3788     assert(m_post_epoch_ack_list.elements == 0);
3789   }
3790 
3791 
handle_event(Ndb * s_ndb,NdbEventOperation * pOp)3792   void handle_event(Ndb* s_ndb, NdbEventOperation *pOp)
3793   {
3794     DBUG_ENTER("handle_event");
3795 
3796     const Ndb_event_data *event_data=
3797       static_cast<const Ndb_event_data*>(pOp->getCustomData());
3798 
3799     if (!check_is_ndb_schema_event(event_data))
3800       DBUG_VOID_RETURN;
3801 
3802     const NDBEVENT::TableEvent ev_type= pOp->getEventType();
3803     switch (ev_type)
3804     {
3805     case NDBEVENT::TE_INSERT:
3806     case NDBEVENT::TE_UPDATE:
3807     {
3808       /* ndb_schema table, row INSERTed or UPDATEed*/
3809       Ndb_schema_op* schema_op=
3810         Ndb_schema_op::create(event_data, pOp->getAnyValue());
3811       handle_schema_op(schema_op);
3812       break;
3813     }
3814 
3815     case NDBEVENT::TE_DELETE:
3816       /* ndb_schema table, row DELETEd */
3817       break;
3818 
3819     case NDBEVENT::TE_CLUSTER_FAILURE:
3820       if (opt_ndb_extra_logging)
3821         sql_print_information("NDB Schema dist: cluster failure "
3822                               "at epoch %u/%u.",
3823                               (uint)(pOp->getGCI() >> 32),
3824                               (uint)(pOp->getGCI()));
3825       // fall through
3826     case NDBEVENT::TE_DROP:
3827       /* ndb_schema table DROPped */
3828       if (opt_ndb_extra_logging &&
3829           ndb_binlog_tables_inited && ndb_binlog_running)
3830         sql_print_information("NDB Binlog: ndb tables initially "
3831                               "read only on reconnect.");
3832 
3833       /* release the ndb_schema_share */
3834       native_mutex_lock(&ndb_schema_share_mutex);
3835       free_share(&ndb_schema_share);
3836       ndb_schema_share= 0;
3837       ndb_binlog_tables_inited= FALSE;
3838       ndb_binlog_is_ready= FALSE;
3839       native_mutex_unlock(&ndb_schema_share_mutex);
3840 
3841       ndb_tdc_close_cached_tables();
3842 
3843       ndb_handle_schema_change(m_thd, s_ndb, pOp, event_data);
3844       break;
3845 
3846     case NDBEVENT::TE_ALTER:
3847       /* ndb_schema table ALTERed */
3848       break;
3849 
3850     case NDBEVENT::TE_NODE_FAILURE:
3851     {
3852       /* Remove all subscribers for node */
3853       m_schema_dist_data.report_data_node_failure(pOp->getNdbdNodeId());
3854       (void) native_cond_signal(&injector_cond);
3855       break;
3856     }
3857 
3858     case NDBEVENT::TE_SUBSCRIBE:
3859     {
3860       /* Add node as subscriber */
3861       m_schema_dist_data.report_subscribe(pOp->getNdbdNodeId(), pOp->getReqNodeId());
3862       (void) native_cond_signal(&injector_cond);
3863       break;
3864     }
3865 
3866     case NDBEVENT::TE_UNSUBSCRIBE:
3867     {
3868       /* Remove node as subscriber */
3869       m_schema_dist_data.report_unsubscribe(pOp->getNdbdNodeId(), pOp->getReqNodeId());
3870       (void) native_cond_signal(&injector_cond);
3871       break;
3872     }
3873 
3874     default:
3875     {
3876       sql_print_error("NDB Schema dist: unknown event %u, ignoring...",
3877                       ev_type);
3878     }
3879     }
3880 
3881     DBUG_VOID_RETURN;
3882   }
3883 
3884 
post_epoch()3885   void post_epoch()
3886   {
3887     if (unlikely(m_post_epoch_handle_list.elements > 0))
3888     {
3889       // Set the flag used to check that functions are called at correct time
3890       m_post_epoch= true;
3891 
3892       /*
3893        process any operations that should be done after
3894        the epoch is complete
3895       */
3896       Ndb_schema_op* schema;
3897       while ((schema= m_post_epoch_handle_list.pop()))
3898       {
3899         handle_schema_op_post_epoch(schema);
3900       }
3901 
3902       /*
3903        process any operations that should be unlocked/acked after
3904        the epoch is complete
3905       */
3906       while ((schema= m_post_epoch_ack_list.pop()))
3907       {
3908         ack_schema_op(schema->db, schema->name,
3909                       schema->id, schema->version);
3910       }
3911     }
3912     // There should be no work left todo...
3913     assert(m_post_epoch_handle_list.elements == 0);
3914     assert(m_post_epoch_ack_list.elements == 0);
3915   }
3916 };
3917 
3918 /*********************************************************************
3919   Internal helper functions for handling of the cluster replication tables
3920   - ndb_binlog_index
3921   - ndb_apply_status
3922 *********************************************************************/
3923 
3924 /*
3925   struct to hold the data to be inserted into the
3926   ndb_binlog_index table
3927 */
3928 struct ndb_binlog_index_row {
3929   ulonglong epoch;
3930   const char *start_master_log_file;
3931   ulonglong start_master_log_pos;
3932   ulong n_inserts;
3933   ulong n_updates;
3934   ulong n_deletes;
3935   ulong n_schemaops;
3936 
3937   ulong orig_server_id;
3938   ulonglong orig_epoch;
3939 
3940   ulong gci;
3941 
3942   const char *next_master_log_file;
3943   ulonglong next_master_log_pos;
3944 
3945   struct ndb_binlog_index_row *next;
3946 };
3947 
3948 
3949 /*
3950   Open the ndb_binlog_index table for writing
3951 */
3952 static int
ndb_binlog_index_table__open(THD * thd,TABLE ** ndb_binlog_index)3953 ndb_binlog_index_table__open(THD *thd,
3954                              TABLE **ndb_binlog_index)
3955 {
3956   const char *save_proc_info=
3957     thd_proc_info(thd, "Opening " NDB_REP_DB "." NDB_REP_TABLE);
3958 
3959   TABLE_LIST tables;
3960   tables.init_one_table(STRING_WITH_LEN(NDB_REP_DB),    // db
3961                         STRING_WITH_LEN(NDB_REP_TABLE), // name
3962                         NDB_REP_TABLE,                  // alias
3963                         TL_WRITE);                      // for write
3964 
3965   /* Only allow real table to be opened */
3966   tables.required_type= FRMTYPE_TABLE;
3967 
3968   const uint flags =
3969     MYSQL_LOCK_IGNORE_TIMEOUT; /* Wait for lock "infinitely" */
3970   if (open_and_lock_tables(thd, &tables, flags))
3971   {
3972     if (thd->killed)
3973       DBUG_PRINT("error", ("NDB Binlog: Opening ndb_binlog_index: killed"));
3974     else
3975       sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
3976                       thd->get_stmt_da()->mysql_errno(),
3977                       thd->get_stmt_da()->message_text());
3978     thd_proc_info(thd, save_proc_info);
3979     return -1;
3980   }
3981   *ndb_binlog_index= tables.table;
3982   thd_proc_info(thd, save_proc_info);
3983   return 0;
3984 }
3985 
3986 
3987 /*
3988   Write rows to the ndb_binlog_index table
3989 */
3990 static int
ndb_binlog_index_table__write_rows(THD * thd,ndb_binlog_index_row * row)3991 ndb_binlog_index_table__write_rows(THD *thd,
3992                                    ndb_binlog_index_row *row)
3993 {
3994   int error= 0;
3995   ndb_binlog_index_row *first= row;
3996   TABLE *ndb_binlog_index= 0;
3997 
3998   /*
3999     Assume this function is not called with an error set in thd
4000     (but clear for safety in release version)
4001    */
4002   assert(!thd->is_error());
4003   thd->clear_error();
4004 
4005   /*
4006     Turn of binlogging to prevent the table changes to be written to
4007     the binary log.
4008   */
4009   tmp_disable_binlog(thd);
4010 
4011   if (ndb_binlog_index_table__open(thd, &ndb_binlog_index))
4012   {
4013     if (thd->killed)
4014       DBUG_PRINT("error", ("NDB Binlog: Unable to lock table ndb_binlog_index, killed"));
4015     else
4016       sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
4017     error= -1;
4018     goto add_ndb_binlog_index_err;
4019   }
4020 
4021   // Set all columns to be written
4022   ndb_binlog_index->use_all_columns();
4023 
4024   do
4025   {
4026     ulonglong epoch= 0, orig_epoch= 0;
4027     uint orig_server_id= 0;
4028 
4029     // Intialize ndb_binlog_index->record[0]
4030     empty_record(ndb_binlog_index);
4031 
4032     ndb_binlog_index->field[NBICOL_START_POS]
4033       ->store(first->start_master_log_pos, true);
4034     ndb_binlog_index->field[NBICOL_START_FILE]
4035       ->store(first->start_master_log_file,
4036               (uint)strlen(first->start_master_log_file),
4037               &my_charset_bin);
4038     ndb_binlog_index->field[NBICOL_EPOCH]
4039       ->store(epoch= first->epoch, true);
4040     if (ndb_binlog_index->s->fields > NBICOL_ORIG_SERVERID)
4041     {
4042       /* Table has ORIG_SERVERID / ORIG_EPOCH columns.
4043        * Write rows with different ORIG_SERVERID / ORIG_EPOCH
4044        * separately
4045        */
4046       ndb_binlog_index->field[NBICOL_NUM_INSERTS]
4047         ->store(row->n_inserts, true);
4048       ndb_binlog_index->field[NBICOL_NUM_UPDATES]
4049         ->store(row->n_updates, true);
4050       ndb_binlog_index->field[NBICOL_NUM_DELETES]
4051         ->store(row->n_deletes, true);
4052       ndb_binlog_index->field[NBICOL_NUM_SCHEMAOPS]
4053         ->store(row->n_schemaops, true);
4054       ndb_binlog_index->field[NBICOL_ORIG_SERVERID]
4055         ->store(orig_server_id= row->orig_server_id, true);
4056       ndb_binlog_index->field[NBICOL_ORIG_EPOCH]
4057         ->store(orig_epoch= row->orig_epoch, true);
4058       ndb_binlog_index->field[NBICOL_GCI]
4059         ->store(first->gci, true);
4060 
4061       if (ndb_binlog_index->s->fields > NBICOL_NEXT_POS)
4062       {
4063         /* Table has next log pos fields, fill them in */
4064         ndb_binlog_index->field[NBICOL_NEXT_POS]
4065           ->store(first->next_master_log_pos, true);
4066         ndb_binlog_index->field[NBICOL_NEXT_FILE]
4067           ->store(first->next_master_log_file,
4068                   (uint)strlen(first->next_master_log_file),
4069                   &my_charset_bin);
4070       }
4071       row= row->next;
4072     }
4073     else
4074     {
4075       /* Old schema : Table has no separate
4076        * ORIG_SERVERID / ORIG_EPOCH columns.
4077        * Merge operation counts and write one row
4078        */
4079       while ((row= row->next))
4080       {
4081         first->n_inserts+= row->n_inserts;
4082         first->n_updates+= row->n_updates;
4083         first->n_deletes+= row->n_deletes;
4084         first->n_schemaops+= row->n_schemaops;
4085       }
4086       ndb_binlog_index->field[NBICOL_NUM_INSERTS]
4087         ->store((ulonglong)first->n_inserts, true);
4088       ndb_binlog_index->field[NBICOL_NUM_UPDATES]
4089         ->store((ulonglong)first->n_updates, true);
4090       ndb_binlog_index->field[NBICOL_NUM_DELETES]
4091         ->store((ulonglong)first->n_deletes, true);
4092       ndb_binlog_index->field[NBICOL_NUM_SCHEMAOPS]
4093         ->store((ulonglong)first->n_schemaops, true);
4094     }
4095 
4096     error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0]);
4097 
4098     /* Fault injection to test logging */
4099     DBUG_EXECUTE_IF("ndb_injector_binlog_index_write_fail_random",
4100                     {
4101                       if ((((uint32) rand()) % 10) == 9)
4102                       {
4103                         sql_print_error("NDB Binlog: Injecting random write failure");
4104                         error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0]);
4105                       }
4106                     });
4107 
4108     if (error)
4109     {
4110       sql_print_error("NDB Binlog: Failed writing to ndb_binlog_index for epoch %u/%u "
4111                       " orig_server_id %u orig_epoch %u/%u "
4112                       "with error %d.",
4113                       uint(epoch >> 32), uint(epoch),
4114                       orig_server_id,
4115                       uint(orig_epoch >> 32), uint(orig_epoch),
4116                       error);
4117 
4118       bool seen_error_row = false;
4119       ndb_binlog_index_row* cursor= first;
4120       do
4121       {
4122         char tmp[128];
4123         if (ndb_binlog_index->s->fields > NBICOL_ORIG_SERVERID)
4124           my_snprintf(tmp, sizeof(tmp), "%u/%u,%u,%u/%u",
4125                       uint(epoch >> 32), uint(epoch),
4126                       uint(cursor->orig_server_id),
4127                       uint(cursor->orig_epoch >> 32),
4128                       uint(cursor->orig_epoch));
4129 
4130         else
4131           my_snprintf(tmp, sizeof(tmp), "%u/%u", uint(epoch >> 32), uint(epoch));
4132 
4133         bool error_row = (row == (cursor->next));
4134         sql_print_error("NDB Binlog: Writing row (%s) to ndb_binlog_index - %s",
4135                         tmp,
4136                         (error_row?"ERROR":
4137                          (seen_error_row?"Discarded":
4138                           "OK")));
4139         seen_error_row |= error_row;
4140 
4141       } while ((cursor = cursor->next));
4142 
4143       error= -1;
4144       goto add_ndb_binlog_index_err;
4145     }
4146   } while (row);
4147 
4148 add_ndb_binlog_index_err:
4149   /*
4150     Explicitly commit or rollback the writes(although we normally
4151     use a non transactional engine for the ndb_binlog_index table)
4152   */
4153   thd->get_stmt_da()->set_overwrite_status(true);
4154   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
4155   thd->get_stmt_da()->set_overwrite_status(false);
4156 
4157   // Close the tables this thread has opened
4158   close_thread_tables(thd);
4159 
4160   /*
4161     There should be no need for rolling back transaction due to deadlock
4162     (since ndb_binlog_index is non transactional).
4163   */
4164   assert(! thd->transaction_rollback_request);
4165 
4166   // Release MDL locks on the opened table
4167   thd->mdl_context.release_transactional_locks();
4168 
4169   reenable_binlog(thd);
4170   return error;
4171 }
4172 
4173 /*********************************************************************
4174   Functions for start, stop, wait for ndbcluster binlog thread
4175 *********************************************************************/
4176 
ndbcluster_binlog_start()4177 int ndbcluster_binlog_start()
4178 {
4179   DBUG_ENTER("ndbcluster_binlog_start");
4180 
4181   if (::server_id == 0)
4182   {
4183     sql_print_warning("NDB: server id set to zero - changes logged to "
4184                       "bin log with server id zero will be logged with "
4185                       "another server id by slave mysqlds");
4186   }
4187 
4188   /*
4189      Check that ServerId is not using the reserved bit or bits reserved
4190      for application use
4191   */
4192   if ((::server_id & 0x1 << 31) ||                             // Reserved bit
4193       !ndbcluster_anyvalue_is_serverid_in_range(::server_id))  // server_id_bits
4194   {
4195     sql_print_error("NDB: server id provided is too large to be represented in "
4196                     "opt_server_id_bits or is reserved");
4197     DBUG_RETURN(-1);
4198   }
4199 
4200   /*
4201      Check that v2 events are enabled if log-transaction-id is set
4202   */
4203   if (opt_ndb_log_transaction_id &&
4204       log_bin_use_v1_row_events)
4205   {
4206     sql_print_error("NDB: --ndb-log-transaction-id requires v2 Binlog row events "
4207                     "but server is using v1.");
4208     DBUG_RETURN(-1);
4209   }
4210 
4211   ndb_binlog_thread.init();
4212 
4213   native_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
4214   native_cond_init(&injector_cond);
4215   native_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
4216 
4217   // The binlog thread globals has been initied and should be freed
4218   ndbcluster_binlog_inited= 1;
4219 
4220   /* Start ndb binlog thread */
4221   if (ndb_binlog_thread.start())
4222   {
4223     DBUG_PRINT("error", ("Could not start ndb binlog thread"));
4224     DBUG_RETURN(-1);
4225   }
4226 
4227   DBUG_RETURN(0);
4228 }
4229 
4230 
4231 /**************************************************************
4232   Internal helper functions for creating/dropping ndb events
4233   used by the client sql threads
4234 **************************************************************/
4235 void
ndb_rep_event_name(String * event_name,const char * db,const char * tbl,my_bool full)4236 ndb_rep_event_name(String *event_name,const char *db, const char *tbl,
4237                    my_bool full)
4238 {
4239   if (full)
4240     event_name->set_ascii("REPLF$", 6);
4241   else
4242     event_name->set_ascii("REPL$", 5);
4243   event_name->append(db);
4244 #ifdef NDB_WIN32
4245   /*
4246    * Some bright spark decided that we should sometimes have backslashes.
4247    * This causes us pain as the event is db/table and not db\table so trying
4248    * to drop db\table when we meant db/table ends in the event lying around
4249    * after drop table, leading to all sorts of pain.
4250   */
4251   String backslash_sep(1);
4252   backslash_sep.set_ascii("\\",1);
4253 
4254   int bsloc;
4255   if((bsloc= event_name->strstr(backslash_sep,0))!=-1)
4256 	  event_name->replace(bsloc, 1, "/", 1);
4257 #endif
4258   if (tbl)
4259   {
4260     event_name->append('/');
4261     event_name->append(tbl);
4262   }
4263   DBUG_PRINT("info", ("ndb_rep_event_name: %s", event_name->c_ptr()));
4264 }
4265 
4266 #ifdef HAVE_NDB_BINLOG
4267 static void
set_binlog_flags(NDB_SHARE * share,Ndb_binlog_type ndb_binlog_type)4268 set_binlog_flags(NDB_SHARE *share,
4269                  Ndb_binlog_type ndb_binlog_type)
4270 {
4271   DBUG_ENTER("set_binlog_flags");
4272   switch (ndb_binlog_type)
4273   {
4274   case NBT_NO_LOGGING:
4275     DBUG_PRINT("info", ("NBT_NO_LOGGING"));
4276     set_binlog_nologging(share);
4277     DBUG_VOID_RETURN;
4278   case NBT_DEFAULT:
4279     DBUG_PRINT("info", ("NBT_DEFAULT"));
4280     if (opt_ndb_log_updated_only)
4281     {
4282       set_binlog_updated_only(share);
4283     }
4284     else
4285     {
4286       set_binlog_full(share);
4287     }
4288     if (opt_ndb_log_update_as_write)
4289     {
4290       set_binlog_use_write(share);
4291     }
4292     else
4293     {
4294       set_binlog_use_update(share);
4295     }
4296     if (opt_ndb_log_update_minimal)
4297     {
4298       set_binlog_update_minimal(share);
4299     }
4300     break;
4301   case NBT_UPDATED_ONLY:
4302     DBUG_PRINT("info", ("NBT_UPDATED_ONLY"));
4303     set_binlog_updated_only(share);
4304     set_binlog_use_write(share);
4305     break;
4306   case NBT_USE_UPDATE:
4307     DBUG_PRINT("info", ("NBT_USE_UPDATE"));
4308   case NBT_UPDATED_ONLY_USE_UPDATE:
4309     DBUG_PRINT("info", ("NBT_UPDATED_ONLY_USE_UPDATE"));
4310     set_binlog_updated_only(share);
4311     set_binlog_use_update(share);
4312     break;
4313   case NBT_FULL:
4314     DBUG_PRINT("info", ("NBT_FULL"));
4315     set_binlog_full(share);
4316     set_binlog_use_write(share);
4317     break;
4318   case NBT_FULL_USE_UPDATE:
4319     DBUG_PRINT("info", ("NBT_FULL_USE_UPDATE"));
4320     set_binlog_full(share);
4321     set_binlog_use_update(share);
4322     break;
4323   case NBT_UPDATED_ONLY_MINIMAL:
4324     DBUG_PRINT("info", ("NBT_UPDATED_ONLY_MINIMAL"));
4325     set_binlog_updated_only(share);
4326     set_binlog_use_update(share);
4327     set_binlog_update_minimal(share);
4328     break;
4329   case NBT_UPDATED_FULL_MINIMAL:
4330     DBUG_PRINT("info", ("NBT_UPDATED_FULL_MINIMAL"));
4331     set_binlog_full(share);
4332     set_binlog_use_update(share);
4333     set_binlog_update_minimal(share);
4334     break;
4335   default:
4336     DBUG_VOID_RETURN;
4337   }
4338   set_binlog_logging(share);
4339   DBUG_VOID_RETURN;
4340 }
4341 
4342 
4343 /*
4344   ndbcluster_get_binlog_replication_info
4345 
4346   This function retrieves the data for the given table
4347   from the ndb_replication table.
4348 
4349   If the table is not found, or the table does not exist,
4350   then defaults are returned.
4351 */
4352 int
ndbcluster_get_binlog_replication_info(THD * thd,Ndb * ndb,const char * db,const char * table_name,uint server_id,Uint32 * binlog_flags,const st_conflict_fn_def ** conflict_fn,st_conflict_fn_arg * args,Uint32 * num_args)4353 ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
4354                                        const char* db,
4355                                        const char* table_name,
4356                                        uint server_id,
4357                                        Uint32* binlog_flags,
4358                                        const st_conflict_fn_def** conflict_fn,
4359                                        st_conflict_fn_arg* args,
4360                                        Uint32* num_args)
4361 {
4362   DBUG_ENTER("ndbcluster_get_binlog_replication_info");
4363 
4364   /* Override for ndb_apply_status when logging */
4365   if (opt_ndb_log_apply_status)
4366   {
4367     if (strcmp(db, NDB_REP_DB) == 0 &&
4368         strcmp(table_name, NDB_APPLY_TABLE) == 0)
4369     {
4370       /*
4371         Ensure that we get all columns from ndb_apply_status updates
4372         by forcing FULL event type
4373         Also, ensure that ndb_apply_status events are always logged as
4374         WRITES.
4375       */
4376       DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
4377       sql_print_information("NDB : ndb-log-apply-status forcing "
4378                             "%s.%s to FULL USE_WRITE",
4379                             NDB_REP_DB, NDB_APPLY_TABLE);
4380       *binlog_flags = NBT_FULL;
4381       *conflict_fn = NULL;
4382       *num_args = 0;
4383       DBUG_RETURN(0);
4384     }
4385   }
4386 
4387   Ndb_rep_tab_reader rep_tab_reader;
4388 
4389   int rc = rep_tab_reader.lookup(ndb,
4390                                  db,
4391                                  table_name,
4392                                  server_id);
4393 
4394   const char* msg = rep_tab_reader.get_warning_message();
4395   if (msg != NULL)
4396   {
4397     push_warning_printf(thd, Sql_condition::SL_WARNING,
4398                         ER_NDB_REPLICATION_SCHEMA_ERROR,
4399                         ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
4400                         msg);
4401     sql_print_warning("NDB Binlog: %s",
4402                       msg);
4403   }
4404 
4405   if (rc != 0)
4406     DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
4407 
4408   *binlog_flags= rep_tab_reader.get_binlog_flags();
4409   const char* conflict_fn_spec= rep_tab_reader.get_conflict_fn_spec();
4410 
4411   if (conflict_fn_spec != NULL)
4412   {
4413     char msgbuf[ FN_REFLEN ];
4414     if (parse_conflict_fn_spec(conflict_fn_spec,
4415                                conflict_fn,
4416                                args,
4417                                num_args,
4418                                msgbuf,
4419                                sizeof(msgbuf)) != 0)
4420     {
4421         push_warning_printf(thd, Sql_condition::SL_WARNING,
4422                           ER_CONFLICT_FN_PARSE_ERROR,
4423                           ER(ER_CONFLICT_FN_PARSE_ERROR),
4424                           msgbuf);
4425 
4426       /*
4427         Log as well, useful for contexts where the thd's stack of
4428         warnings are ignored
4429       */
4430       if (opt_ndb_extra_logging)
4431       {
4432         sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s",
4433                           db, table_name,
4434                           msgbuf);
4435       }
4436 
4437       DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
4438     }
4439   }
4440   else
4441   {
4442     /* No conflict function specified */
4443     conflict_fn= NULL;
4444     num_args= 0;
4445   }
4446 
4447   DBUG_RETURN(0);
4448 }
4449 
4450 int
ndbcluster_apply_binlog_replication_info(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,const st_conflict_fn_def * conflict_fn,const st_conflict_fn_arg * args,Uint32 num_args,bool do_set_binlog_flags,Uint32 binlog_flags)4451 ndbcluster_apply_binlog_replication_info(THD *thd,
4452                                          NDB_SHARE *share,
4453                                          const NDBTAB* ndbtab,
4454                                          const st_conflict_fn_def* conflict_fn,
4455                                          const st_conflict_fn_arg* args,
4456                                          Uint32 num_args,
4457                                          bool do_set_binlog_flags,
4458                                          Uint32 binlog_flags)
4459 {
4460   DBUG_ENTER("ndbcluster_apply_binlog_replication_info");
4461   char tmp_buf[FN_REFLEN];
4462 
4463   if (do_set_binlog_flags)
4464   {
4465     DBUG_PRINT("info", ("Setting binlog flags to %u", binlog_flags));
4466     set_binlog_flags(share, (enum Ndb_binlog_type)binlog_flags);
4467   }
4468 
4469   if (conflict_fn != NULL)
4470   {
4471     if (setup_conflict_fn(get_thd_ndb(thd)->ndb,
4472                           &share->m_cfn_share,
4473                           share->db,
4474                           share->table_name,
4475                           ((share->flags & NSF_BLOB_FLAG) != 0),
4476                           get_binlog_use_update(share),
4477                           ndbtab,
4478                           tmp_buf, sizeof(tmp_buf),
4479                           conflict_fn,
4480                           args,
4481                           num_args) == 0)
4482     {
4483       if (opt_ndb_extra_logging)
4484       {
4485         sql_print_information("%s", tmp_buf);
4486       }
4487     }
4488     else
4489     {
4490       /*
4491         Dump setup failure message to error log
4492         for cases where thd warning stack is
4493         ignored
4494       */
4495       sql_print_warning("NDB Slave: Table %s.%s : %s",
4496                         share->db,
4497                         share->table_name,
4498                         tmp_buf);
4499 
4500       push_warning_printf(thd, Sql_condition::SL_WARNING,
4501                           ER_CONFLICT_FN_PARSE_ERROR,
4502                           ER(ER_CONFLICT_FN_PARSE_ERROR),
4503                           tmp_buf);
4504 
4505       DBUG_RETURN(-1);
4506     }
4507   }
4508   else
4509   {
4510     /* No conflict function specified */
4511     slave_reset_conflict_fn(share->m_cfn_share);
4512   }
4513 
4514   DBUG_RETURN(0);
4515 }
4516 
4517 int
ndbcluster_read_binlog_replication(THD * thd,Ndb * ndb,NDB_SHARE * share,const NDBTAB * ndbtab,uint server_id,bool do_set_binlog_flags)4518 ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
4519                                    NDB_SHARE *share,
4520                                    const NDBTAB *ndbtab,
4521                                    uint server_id,
4522                                    bool do_set_binlog_flags)
4523 {
4524   DBUG_ENTER("ndbcluster_read_binlog_replication");
4525   Uint32 binlog_flags;
4526   const st_conflict_fn_def* conflict_fn= NULL;
4527   st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
4528   Uint32 num_args = MAX_CONFLICT_ARGS;
4529 
4530   if ((ndbcluster_get_binlog_replication_info(thd, ndb,
4531                                               share->db,
4532                                               share->table_name,
4533                                               server_id,
4534                                               &binlog_flags,
4535                                               &conflict_fn,
4536                                               args,
4537                                               &num_args) != 0) ||
4538       (ndbcluster_apply_binlog_replication_info(thd,
4539                                                 share,
4540                                                 ndbtab,
4541                                                 conflict_fn,
4542                                                 args,
4543                                                 num_args,
4544                                                 do_set_binlog_flags,
4545                                                 binlog_flags) != 0))
4546   {
4547     DBUG_RETURN(-1);
4548   }
4549 
4550   DBUG_RETURN(0);
4551 }
4552 #endif /* HAVE_NDB_BINLOG */
4553 
4554 bool
ndbcluster_check_if_local_table(const char * dbname,const char * tabname)4555 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
4556 {
4557   char key[FN_REFLEN + 1];
4558   char ndb_file[FN_REFLEN + 1];
4559 
4560   DBUG_ENTER("ndbcluster_check_if_local_table");
4561   build_table_filename(key, sizeof(key)-1, dbname, tabname, reg_ext, 0);
4562   build_table_filename(ndb_file, sizeof(ndb_file)-1,
4563                        dbname, tabname, ha_ndb_ext, 0);
4564   /* Check that any defined table is an ndb table */
4565   DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
4566   if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
4567   {
4568     DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
4569 
4570 
4571     DBUG_RETURN(true);
4572   }
4573 
4574   DBUG_RETURN(false);
4575 }
4576 
4577 
4578 /*
4579   Common function for setting up everything for logging a table at
4580   create/discover.
4581 */
ndbcluster_create_binlog_setup(THD * thd,Ndb * ndb,const char * key,uint key_len,const char * db,const char * table_name,TABLE * table)4582 int ndbcluster_create_binlog_setup(THD *thd, Ndb *ndb, const char *key,
4583                                    uint key_len,
4584                                    const char *db,
4585                                    const char *table_name,
4586                                    TABLE * table)
4587 {
4588   DBUG_ENTER("ndbcluster_create_binlog_setup");
4589   DBUG_PRINT("enter",("key: %s  key_len: %d  %s.%s",
4590                       key, key_len, db, table_name));
4591   assert(! IS_NDB_BLOB_PREFIX(table_name));
4592   assert(strlen(key) == key_len);
4593 
4594   NDB_SHARE* share= get_share(key, table, true, false);
4595   if (share == 0)
4596   {
4597     /**
4598      * Failed to create share
4599      */
4600     DBUG_RETURN(-1);
4601   }
4602 
4603   native_mutex_lock(&share->mutex);
4604   if (get_binlog_nologging(share) || share->op != 0 || share->new_op != 0)
4605   {
4606     native_mutex_unlock(&share->mutex);
4607     free_share(&share);
4608     DBUG_RETURN(0); // replication already setup, or should not
4609   }
4610 
4611   if (!share->need_events(ndb_binlog_running))
4612   {
4613     set_binlog_nologging(share);
4614     native_mutex_unlock(&share->mutex);
4615     DBUG_RETURN(0);
4616   }
4617 
4618   while (share && !IS_TMP_PREFIX(table_name))
4619   {
4620     /*
4621       ToDo make sanity check of share so that the table is actually the same
4622       I.e. we need to do open file from frm in this case
4623       Currently awaiting this to be fixed in the 4.1 tree in the general
4624       case
4625     */
4626 
4627     /* Create the event in NDB */
4628     ndb->setDatabaseName(db);
4629 
4630     NDBDICT *dict= ndb->getDictionary();
4631     Ndb_table_guard ndbtab_g(dict, table_name);
4632     const NDBTAB *ndbtab= ndbtab_g.get_table();
4633     if (ndbtab == 0)
4634     {
4635       if (opt_ndb_extra_logging)
4636         sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
4637                               "%s, %d", key, dict->getNdbError().message,
4638                               dict->getNdbError().code);
4639       break; // error
4640     }
4641 #ifdef HAVE_NDB_BINLOG
4642     /*
4643      */
4644     ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
4645                                        ::server_id, TRUE);
4646 #endif
4647     /*
4648       check if logging turned off for this table
4649     */
4650     if ((share->flags & NSF_HIDDEN_PK) &&
4651         (share->flags & NSF_BLOB_FLAG) &&
4652         !(share->flags & NSF_NO_BINLOG))
4653     {
4654       DBUG_PRINT("NDB_SHARE", ("NSF_HIDDEN_PK && NSF_BLOB_FLAG -> NSF_NO_BINLOG"));
4655       share->flags |= NSF_NO_BINLOG;
4656     }
4657     if (get_binlog_nologging(share))
4658     {
4659       if (opt_ndb_extra_logging)
4660         sql_print_information("NDB Binlog: NOT logging %s",
4661                               share->key_string());
4662       native_mutex_unlock(&share->mutex);
4663       DBUG_RETURN(0);
4664     }
4665 
4666     String event_name(INJECTOR_EVENT_LEN);
4667     ndb_rep_event_name(&event_name, db, table_name, get_binlog_full(share));
4668     /*
4669       event should have been created by someone else,
4670       but let's make sure, and create if it doesn't exist
4671     */
4672     const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
4673     if (!ev)
4674     {
4675       if (ndbcluster_create_event(thd, ndb, ndbtab, event_name.c_ptr(), share))
4676       {
4677         sql_print_error("NDB Binlog: "
4678                         "FAILED CREATE (DISCOVER) TABLE Event: %s",
4679                         event_name.c_ptr());
4680         break; // error
4681       }
4682       if (opt_ndb_extra_logging)
4683         sql_print_information("NDB Binlog: "
4684                               "CREATE (DISCOVER) TABLE Event: %s",
4685                               event_name.c_ptr());
4686     }
4687     else
4688     {
4689       delete ev;
4690       if (opt_ndb_extra_logging)
4691         sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
4692                               event_name.c_ptr());
4693     }
4694 
4695     /*
4696       create the event operations for receiving logging events
4697     */
4698     if (ndbcluster_create_event_ops(thd, share,
4699                                     ndbtab, event_name.c_ptr()))
4700     {
4701       sql_print_error("NDB Binlog:"
4702                       "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
4703                       event_name.c_ptr());
4704       /* a warning has been issued to the client */
4705       break;
4706     }
4707     native_mutex_unlock(&share->mutex);
4708     DBUG_RETURN(0);
4709   }
4710 
4711   native_mutex_unlock(&share->mutex);
4712   free_share(&share);
4713   DBUG_RETURN(-1);
4714 }
4715 
4716 int
ndbcluster_create_event(THD * thd,Ndb * ndb,const NDBTAB * ndbtab,const char * event_name,NDB_SHARE * share,int push_warning)4717 ndbcluster_create_event(THD *thd, Ndb *ndb, const NDBTAB *ndbtab,
4718                         const char *event_name, NDB_SHARE *share,
4719                         int push_warning)
4720 {
4721   DBUG_ENTER("ndbcluster_create_event");
4722   DBUG_PRINT("enter", ("table: '%s', version: %d",
4723                       ndbtab->getName(), ndbtab->getObjectVersion()));
4724   DBUG_PRINT("enter", ("event: '%s', share->key: '%s'",
4725                        event_name, share->key_string()));
4726 
4727   // Never create event on table with temporary name
4728   assert(! IS_TMP_PREFIX(ndbtab->getName()));
4729   // Never create event on the blob table(s)
4730   assert(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
4731   assert(share);
4732 
4733   if (get_binlog_nologging(share))
4734   {
4735     if (opt_ndb_extra_logging && ndb_binlog_running)
4736       sql_print_information("NDB Binlog: NOT logging %s",
4737                             share->key_string());
4738     DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
4739                         share->flags, share->flags & NSF_NO_BINLOG));
4740     DBUG_RETURN(0);
4741   }
4742 
4743   ndb->setDatabaseName(share->db);
4744   NDBDICT *dict= ndb->getDictionary();
4745   NDBEVENT my_event(event_name);
4746   my_event.setTable(*ndbtab);
4747   my_event.addTableEvent(NDBEVENT::TE_ALL);
4748   if (share->flags & NSF_HIDDEN_PK)
4749   {
4750     if (share->flags & NSF_BLOB_FLAG)
4751     {
4752       sql_print_error("NDB Binlog: logging of table %s "
4753                       "with BLOB attribute and no PK is not supported",
4754                       share->key_string());
4755       if (push_warning)
4756         push_warning_printf(thd, Sql_condition::SL_WARNING,
4757                             ER_ILLEGAL_HA_CREATE_OPTION,
4758                             ER(ER_ILLEGAL_HA_CREATE_OPTION),
4759                             ndbcluster_hton_name,
4760                             "Binlog of table with BLOB attribute and no PK");
4761 
4762       share->flags|= NSF_NO_BINLOG;
4763       DBUG_RETURN(-1);
4764     }
4765     /* No primary key, subscribe for all attributes */
4766     my_event.setReport((NDBEVENT::EventReport)
4767                        (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
4768     DBUG_PRINT("info", ("subscription all"));
4769   }
4770   else
4771   {
4772     if (strcmp(share->db, NDB_REP_DB) == 0 &&
4773         strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
4774     {
4775       /**
4776        * ER_SUBSCRIBE is only needed on NDB_SCHEMA_TABLE
4777        */
4778       my_event.setReport((NDBEVENT::EventReport)
4779                          (NDBEVENT::ER_ALL |
4780                           NDBEVENT::ER_SUBSCRIBE |
4781                           NDBEVENT::ER_DDL));
4782       DBUG_PRINT("info", ("subscription all and subscribe"));
4783     }
4784     else
4785     {
4786       if (get_binlog_full(share))
4787       {
4788         my_event.setReport((NDBEVENT::EventReport)
4789                            (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
4790         DBUG_PRINT("info", ("subscription all"));
4791       }
4792       else
4793       {
4794         my_event.setReport((NDBEVENT::EventReport)
4795                            (NDBEVENT::ER_UPDATED | NDBEVENT::ER_DDL));
4796         DBUG_PRINT("info", ("subscription only updated"));
4797       }
4798     }
4799   }
4800   if (share->flags & NSF_BLOB_FLAG)
4801     my_event.mergeEvents(TRUE);
4802 
4803   /* add all columns to the event */
4804   int n_cols= ndbtab->getNoOfColumns();
4805   for(int a= 0; a < n_cols; a++)
4806     my_event.addEventColumn(a);
4807 
4808   if (dict->createEvent(my_event)) // Add event to database
4809   {
4810     if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
4811     {
4812       /*
4813         failed, print a warning
4814       */
4815       if (push_warning > 1)
4816         push_warning_printf(thd, Sql_condition::SL_WARNING,
4817                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
4818                             dict->getNdbError().code,
4819                             dict->getNdbError().message, "NDB");
4820       sql_print_error("NDB Binlog: Unable to create event in database. "
4821                       "Event: %s  Error Code: %d  Message: %s", event_name,
4822                       dict->getNdbError().code, dict->getNdbError().message);
4823       DBUG_RETURN(-1);
4824     }
4825 
4826     /*
4827       try retrieving the event, if table version/id matches, we will get
4828       a valid event.  Otherwise we have a trailing event from before
4829     */
4830     const NDBEVENT *ev;
4831     if ((ev= dict->getEvent(event_name)))
4832     {
4833       delete ev;
4834       DBUG_RETURN(0);
4835     }
4836 
4837     /*
4838       trailing event from before; an error, but try to correct it
4839     */
4840     if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
4841         dict->dropEvent(my_event.getName(), 1))
4842     {
4843       if (push_warning > 1)
4844         push_warning_printf(thd, Sql_condition::SL_WARNING,
4845                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
4846                             dict->getNdbError().code,
4847                             dict->getNdbError().message, "NDB");
4848       sql_print_error("NDB Binlog: Unable to create event in database. "
4849                       " Attempt to correct with drop failed. "
4850                       "Event: %s Error Code: %d Message: %s",
4851                       event_name,
4852                       dict->getNdbError().code,
4853                       dict->getNdbError().message);
4854       DBUG_RETURN(-1);
4855     }
4856 
4857     /*
4858       try to add the event again
4859     */
4860     if (dict->createEvent(my_event))
4861     {
4862       if (push_warning > 1)
4863         push_warning_printf(thd, Sql_condition::SL_WARNING,
4864                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
4865                             dict->getNdbError().code,
4866                             dict->getNdbError().message, "NDB");
4867       sql_print_error("NDB Binlog: Unable to create event in database. "
4868                       " Attempt to correct with drop ok, but create failed. "
4869                       "Event: %s Error Code: %d Message: %s",
4870                       event_name,
4871                       dict->getNdbError().code,
4872                       dict->getNdbError().message);
4873       DBUG_RETURN(-1);
4874     }
4875   }
4876 
4877   DBUG_RETURN(0);
4878 }
4879 
4880 
is_ndb_compatible_type(Field * field)4881 inline int is_ndb_compatible_type(Field *field)
4882 {
4883   return
4884     !(field->flags & BLOB_FLAG) &&
4885     field->type() != MYSQL_TYPE_BIT &&
4886     field->pack_length() != 0;
4887 }
4888 
4889 /*
4890   - create eventOperations for receiving log events
4891   - setup ndb recattrs for reception of log event data
4892   - "start" the event operation
4893 
4894   used at create/discover of tables
4895 */
4896 int
ndbcluster_create_event_ops(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,const char * event_name)4897 ndbcluster_create_event_ops(THD *thd, NDB_SHARE *share,
4898                             const NDBTAB *ndbtab, const char *event_name)
4899 {
4900   /*
4901     we are in either create table or rename table so table should be
4902     locked, hence we can work with the share without locks
4903   */
4904 
4905   DBUG_ENTER("ndbcluster_create_event_ops");
4906   DBUG_PRINT("enter", ("table: '%s' event: '%s', share->key: '%s'",
4907                        ndbtab->getName(), event_name, share->key_string()));
4908 
4909   // Never create event on table with temporary name
4910   assert(! IS_TMP_PREFIX(ndbtab->getName()));
4911   // Never create event on the blob table(s)
4912   assert(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
4913   assert(share);
4914 
4915   if (get_binlog_nologging(share))
4916   {
4917     DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
4918                         share->flags));
4919     DBUG_RETURN(0);
4920   }
4921 
4922   // Don't allow event ops to be created on distributed priv tables
4923   // they are distributed via ndb_schema
4924   assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
4925                                                         share->table_name));
4926 
4927   int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
4928   if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
4929       strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
4930     do_ndb_schema_share= 1;
4931   else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
4932            strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
4933     do_ndb_apply_status_share= 1;
4934   else
4935 #ifdef HAVE_NDB_BINLOG
4936     if (!binlog_filter->db_ok(share->db) ||
4937         !ndb_binlog_running ||
4938         is_exceptions_table(share->table_name))
4939 #endif
4940   {
4941     share->flags|= NSF_NO_BINLOG;
4942     DBUG_RETURN(0);
4943   }
4944 
4945   // Check that the share agrees
4946   assert(share->need_events(ndb_binlog_running));
4947 
4948   Ndb_event_data *event_data= share->event_data;
4949   if (share->op)
4950   {
4951     event_data= (Ndb_event_data *) share->op->getCustomData();
4952     assert(event_data->share == share);
4953     assert(share->event_data == 0);
4954 
4955     assert(share->use_count > 1);
4956     sql_print_error("NDB Binlog: discover reusing old ev op");
4957     /* ndb_share reference ToDo free */
4958     DBUG_PRINT("NDB_SHARE", ("%s ToDo free  use_count: %u",
4959                              share->key_string(), share->use_count));
4960     free_share(&share); // old event op already has reference
4961     DBUG_RETURN(0);
4962   }
4963 
4964   assert(event_data != 0);
4965   TABLE *table= event_data->shadow_table;
4966 
4967   int retries= 100;
4968   /*
4969     100 milliseconds, temporary error on schema operation can
4970     take some time to be resolved
4971   */
4972   int retry_sleep= 100;
4973   while (1)
4974   {
4975     Mutex_guard injector_mutex_g(injector_mutex);
4976     Ndb *ndb= injector_ndb;
4977     if (do_ndb_schema_share)
4978       ndb= schema_ndb;
4979 
4980     if (ndb == 0)
4981       DBUG_RETURN(-1);
4982 
4983     NdbEventOperation* op;
4984     if (do_ndb_schema_share)
4985       op= ndb->createEventOperation(event_name);
4986     else
4987     {
4988       // set injector_ndb database/schema from table internal name
4989       int ret= ndb->setDatabaseAndSchemaName(ndbtab);
4990       assert(ret == 0); NDB_IGNORE_VALUE(ret);
4991       op= ndb->createEventOperation(event_name);
4992       // reset to catch errors
4993       ndb->setDatabaseName("");
4994     }
4995     if (!op)
4996     {
4997       sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
4998                       " %s",event_name);
4999       push_warning_printf(thd, Sql_condition::SL_WARNING,
5000                           ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5001                           ndb->getNdbError().code,
5002                           ndb->getNdbError().message,
5003                           "NDB");
5004       DBUG_RETURN(-1);
5005     }
5006 
5007     if (share->flags & NSF_BLOB_FLAG)
5008       op->mergeEvents(TRUE); // currently not inherited from event
5009 
5010     const uint n_columns= ndbtab->getNoOfColumns();
5011     const uint n_fields= table->s->fields;
5012     const uint val_length= sizeof(NdbValue) * n_columns;
5013 
5014     /*
5015        Allocate memory globally so it can be reused after online alter table
5016     */
5017     if (my_multi_malloc(PSI_INSTRUMENT_ME,
5018                         MYF(MY_WME),
5019                         &event_data->ndb_value[0],
5020                         val_length,
5021                         &event_data->ndb_value[1],
5022                         val_length,
5023                         NULL) == 0)
5024     {
5025       DBUG_PRINT("info", ("Failed to allocate records for event operation"));
5026       DBUG_RETURN(-1);
5027     }
5028 
5029     for (uint j= 0; j < n_columns; j++)
5030     {
5031       const char *col_name= ndbtab->getColumn(j)->getName();
5032       NdbValue attr0, attr1;
5033       if (j < n_fields)
5034       {
5035         Field *f= table->field[j];
5036         if (is_ndb_compatible_type(f))
5037         {
5038           DBUG_PRINT("info", ("%s compatible", col_name));
5039           attr0.rec= op->getValue(col_name, (char*) f->ptr);
5040           attr1.rec= op->getPreValue(col_name,
5041                                      (f->ptr - table->record[0]) +
5042                                      (char*) table->record[1]);
5043         }
5044         else if (! (f->flags & BLOB_FLAG))
5045         {
5046           DBUG_PRINT("info", ("%s non compatible", col_name));
5047           attr0.rec= op->getValue(col_name);
5048           attr1.rec= op->getPreValue(col_name);
5049         }
5050         else
5051         {
5052           DBUG_PRINT("info", ("%s blob", col_name));
5053           assert(share->flags & NSF_BLOB_FLAG);
5054           attr0.blob= op->getBlobHandle(col_name);
5055           attr1.blob= op->getPreBlobHandle(col_name);
5056           if (attr0.blob == NULL || attr1.blob == NULL)
5057           {
5058             sql_print_error("NDB Binlog: Creating NdbEventOperation"
5059                             " blob field %u handles failed (code=%d) for %s",
5060                             j, op->getNdbError().code, event_name);
5061             push_warning_printf(thd, Sql_condition::SL_WARNING,
5062                                 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5063                                 op->getNdbError().code,
5064                                 op->getNdbError().message,
5065                                 "NDB");
5066             ndb->dropEventOperation(op);
5067             DBUG_RETURN(-1);
5068           }
5069         }
5070       }
5071       else
5072       {
5073         DBUG_PRINT("info", ("%s hidden key", col_name));
5074         attr0.rec= op->getValue(col_name);
5075         attr1.rec= op->getPreValue(col_name);
5076       }
5077       event_data->ndb_value[0][j].ptr= attr0.ptr;
5078       event_data->ndb_value[1][j].ptr= attr1.ptr;
5079       DBUG_PRINT("info", ("&event_data->ndb_value[0][%d]: 0x%lx  "
5080                           "event_data->ndb_value[0][%d]: 0x%lx",
5081                           j, (long) &event_data->ndb_value[0][j],
5082                           j, (long) attr0.ptr));
5083       DBUG_PRINT("info", ("&event_data->ndb_value[1][%d]: 0x%lx  "
5084                           "event_data->ndb_value[1][%d]: 0x%lx",
5085                           j, (long) &event_data->ndb_value[0][j],
5086                           j, (long) attr1.ptr));
5087     }
5088     op->setCustomData((void *) event_data); // set before execute
5089     share->event_data= 0;                   // take over event data
5090     share->op= op; // assign op in NDB_SHARE
5091 
5092     /* Check if user explicitly requires monitoring of empty updates */
5093     if (opt_ndb_log_empty_update)
5094       op->setAllowEmptyUpdate(true);
5095 
5096     if (op->execute())
5097     {
5098       share->op= NULL;
5099       retries--;
5100       if (op->getNdbError().status != NdbError::TemporaryError &&
5101           op->getNdbError().code != 1407)
5102         retries= 0;
5103       if (retries == 0)
5104       {
5105         push_warning_printf(thd, Sql_condition::SL_WARNING,
5106                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5107                             op->getNdbError().code, op->getNdbError().message,
5108                             "NDB");
5109         sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
5110                         event_name,
5111                         op->getNdbError().code, op->getNdbError().message);
5112       }
5113       share->event_data= event_data;
5114       op->setCustomData(NULL);
5115       ndb->dropEventOperation(op);
5116       if (retries && !thd->killed)
5117       {
5118         do_retry_sleep(retry_sleep);
5119         continue;
5120       }
5121       DBUG_RETURN(-1);
5122     }
5123     break;
5124   }
5125 
5126   /* ndb_share reference binlog */
5127   get_share(share);
5128   DBUG_PRINT("NDB_SHARE", ("%s binlog  use_count: %u",
5129                            share->key_string(), share->use_count));
5130   if (do_ndb_apply_status_share)
5131   {
5132     /* ndb_share reference binlog extra */
5133     ndb_apply_status_share= get_share(share);
5134     DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
5135                              share->key_string(), share->use_count));
5136     (void) native_cond_signal(&injector_cond);
5137   }
5138   else if (do_ndb_schema_share)
5139   {
5140     /* ndb_share reference binlog extra */
5141     ndb_schema_share= get_share(share);
5142     DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
5143                              share->key_string(), share->use_count));
5144     (void) native_cond_signal(&injector_cond);
5145   }
5146 
5147   DBUG_PRINT("info",("%s share->op: 0x%lx  share->use_count: %u",
5148                      share->key_string(), (long) share->op,
5149                      share->use_count));
5150 
5151   if (opt_ndb_extra_logging)
5152     sql_print_information("NDB Binlog: logging %s (%s,%s)",
5153                           share->key_string(),
5154                           get_binlog_full(share) ? "FULL" : "UPDATED",
5155                           get_binlog_use_update(share) ? "USE_UPDATE" : "USE_WRITE");
5156   DBUG_RETURN(0);
5157 }
5158 
5159 int
ndbcluster_drop_event(THD * thd,Ndb * ndb,NDB_SHARE * share,const char * dbname,const char * tabname)5160 ndbcluster_drop_event(THD *thd, Ndb *ndb, NDB_SHARE *share,
5161                       const char *dbname,
5162                       const char *tabname)
5163 {
5164   DBUG_ENTER("ndbcluster_drop_event");
5165   /*
5166     There might be 2 types of events setup for the table, we cannot know
5167     which ones are supposed to be there as they may have been created
5168     differently for different mysqld's.  So we drop both
5169   */
5170   for (uint i= 0; i < 2; i++)
5171   {
5172     NDBDICT *dict= ndb->getDictionary();
5173     String event_name(INJECTOR_EVENT_LEN);
5174     ndb_rep_event_name(&event_name, dbname, tabname, i);
5175 
5176     if (!dict->dropEvent(event_name.c_ptr()))
5177       continue;
5178 
5179     if (dict->getNdbError().code != 4710 &&
5180         dict->getNdbError().code != 1419)
5181     {
5182       /* drop event failed for some reason, issue a warning */
5183       push_warning_printf(thd, Sql_condition::SL_WARNING,
5184                           ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5185                           dict->getNdbError().code,
5186                           dict->getNdbError().message, "NDB");
5187       /* error is not that the event did not exist */
5188       sql_print_error("NDB Binlog: Unable to drop event in database. "
5189                       "Event: %s Error Code: %d Message: %s",
5190                       event_name.c_ptr(),
5191                       dict->getNdbError().code,
5192                       dict->getNdbError().message);
5193       /* ToDo; handle error? */
5194       if (share && share->op &&
5195           share->op->getState() == NdbEventOperation::EO_EXECUTING &&
5196           dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
5197       {
5198         assert(FALSE);
5199         DBUG_RETURN(-1);
5200       }
5201     }
5202   }
5203   DBUG_RETURN(0);
5204 }
5205 
5206 /*
5207   when entering the calling thread should have a share lock id share != 0
5208   then the injector thread will have  one as well, i.e. share->use_count == 0
5209   (unless it has already dropped... then share->op == 0)
5210 */
5211 
5212 int
ndbcluster_handle_drop_table(THD * thd,Ndb * ndb,NDB_SHARE * share,const char * type_str,const char * dbname,const char * tabname)5213 ndbcluster_handle_drop_table(THD *thd, Ndb *ndb, NDB_SHARE *share,
5214                              const char *type_str,
5215                              const char * dbname, const char * tabname)
5216 {
5217   DBUG_ENTER("ndbcluster_handle_drop_table");
5218 
5219   if (dbname && tabname)
5220   {
5221     if (ndbcluster_drop_event(thd, ndb, share, dbname, tabname))
5222       DBUG_RETURN(-1);
5223   }
5224 
5225   if (share == 0 || share->op == 0)
5226   {
5227     DBUG_RETURN(0);
5228   }
5229 
5230 /*
5231   Syncronized drop between client thread and injector thread is
5232   neccessary in order to maintain ordering in the binlog,
5233   such that the drop occurs _after_ any inserts/updates/deletes.
5234 
5235   The penalty for this is that the drop table becomes slow.
5236 
5237   This wait is however not strictly neccessary to produce a binlog
5238   that is usable.  However the slave does not currently handle
5239   these out of order, thus we are keeping the SYNC_DROP_ defined
5240   for now.
5241 */
5242   const char *save_proc_info= thd->proc_info;
5243 #define SYNC_DROP_
5244 #ifdef SYNC_DROP_
5245   thd->proc_info= "Syncing ndb table schema operation and binlog";
5246   native_mutex_lock(&share->mutex);
5247   int max_timeout= DEFAULT_SYNC_TIMEOUT;
5248   while (share->op)
5249   {
5250     struct timespec abstime;
5251     set_timespec(&abstime, 1);
5252 
5253     // Unlock the share and wait for injector to signal that
5254     // something has happened. (NOTE! convoluted in order to
5255     // only use injector_cond with injector_mutex)
5256     native_mutex_unlock(&share->mutex);
5257     native_mutex_lock(&injector_mutex);
5258     int ret= native_cond_timedwait(&injector_cond,
5259                                     &injector_mutex,
5260                                     &abstime);
5261     native_mutex_unlock(&injector_mutex);
5262     native_mutex_lock(&share->mutex);
5263 
5264     if (thd->killed ||
5265         share->op == 0)
5266       break;
5267     if (ret)
5268     {
5269       max_timeout--;
5270       if (max_timeout == 0)
5271       {
5272         sql_print_error("NDB %s: %s timed out. Ignoring...",
5273                         type_str, share->key_string());
5274         assert(false);
5275         break;
5276       }
5277       if (opt_ndb_extra_logging)
5278         ndb_report_waiting(type_str, max_timeout,
5279                            type_str, share->key_string(), 0);
5280     }
5281   }
5282   native_mutex_unlock(&share->mutex);
5283 #else
5284   native_mutex_lock(&share->mutex);
5285   share->op= 0;
5286   native_mutex_unlock(&share->mutex);
5287 #endif
5288   thd->proc_info= save_proc_info;
5289 
5290   DBUG_RETURN(0);
5291 }
5292 
5293 
5294 /********************************************************************
5295   Internal helper functions for differentd events from the stoarage nodes
5296   used by the ndb injector thread
5297 ********************************************************************/
5298 
5299 /*
5300   Unpack a record read from NDB
5301 
5302   SYNOPSIS
5303     ndb_unpack_record()
5304     buf                 Buffer to store read row
5305 
5306   NOTE
5307     The data for each row is read directly into the
5308     destination buffer. This function is primarily
5309     called in order to check if any fields should be
5310     set to null.
5311 */
5312 
ndb_unpack_record(TABLE * table,NdbValue * value,MY_BITMAP * defined,uchar * buf)5313 static void ndb_unpack_record(TABLE *table, NdbValue *value,
5314                               MY_BITMAP *defined, uchar *buf)
5315 {
5316   Field **p_field= table->field, *field= *p_field;
5317   my_ptrdiff_t row_offset= (my_ptrdiff_t) (buf - table->record[0]);
5318   my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
5319   DBUG_ENTER("ndb_unpack_record");
5320 
5321   /*
5322     Set the filler bits of the null byte, since they are
5323     not touched in the code below.
5324 
5325     The filler bits are the MSBs in the last null byte
5326   */
5327   if (table->s->null_bytes > 0)
5328        buf[table->s->null_bytes - 1]|= 256U - (1U <<
5329 					       table->s->last_null_bit_pos);
5330   /*
5331     Set null flag(s)
5332   */
5333   for ( ; field;
5334        p_field++, value++, field= *p_field)
5335   {
5336     field->set_notnull(row_offset);
5337     if ((*value).ptr)
5338     {
5339       if (!(field->flags & BLOB_FLAG))
5340       {
5341         int is_null= (*value).rec->isNULL();
5342         if (is_null)
5343         {
5344           if (is_null > 0)
5345           {
5346             DBUG_PRINT("info",("[%u] NULL", field->field_index));
5347             field->set_null(row_offset);
5348           }
5349           else
5350           {
5351             DBUG_PRINT("info",("[%u] UNDEFINED", field->field_index));
5352             bitmap_clear_bit(defined, field->field_index);
5353           }
5354         }
5355         else if (field->type() == MYSQL_TYPE_BIT)
5356         {
5357           Field_bit *field_bit= static_cast<Field_bit*>(field);
5358 
5359           /*
5360             Move internal field pointer to point to 'buf'.  Calling
5361             the correct member function directly since we know the
5362             type of the object.
5363            */
5364           field_bit->Field_bit::move_field_offset(row_offset);
5365           if (field->pack_length() < 5)
5366           {
5367             DBUG_PRINT("info", ("bit field H'%.8X",
5368                                 (*value).rec->u_32_value()));
5369             field_bit->Field_bit::store((longlong) (*value).rec->u_32_value(),
5370                                         TRUE);
5371           }
5372           else
5373           {
5374             DBUG_PRINT("info", ("bit field H'%.8X%.8X",
5375                                 *(Uint32 *)(*value).rec->aRef(),
5376                                 *((Uint32 *)(*value).rec->aRef()+1)));
5377 #ifdef WORDS_BIGENDIAN
5378             /* lsw is stored first */
5379             Uint32 *buf= (Uint32 *)(*value).rec->aRef();
5380             field_bit->Field_bit::store((((longlong)*buf)
5381                                          & 0x00000000FFFFFFFFLL)
5382                                         |
5383                                         ((((longlong)*(buf+1)) << 32)
5384                                          & 0xFFFFFFFF00000000LL),
5385                                         TRUE);
5386 #else
5387             field_bit->Field_bit::store((longlong)
5388                                         (*value).rec->u_64_value(), TRUE);
5389 #endif
5390           }
5391           /*
5392             Move back internal field pointer to point to original
5393             value (usually record[0]).
5394            */
5395           field_bit->Field_bit::move_field_offset(-row_offset);
5396           DBUG_PRINT("info",("[%u] SET",
5397                              (*value).rec->getColumn()->getColumnNo()));
5398           DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5399         }
5400         else
5401         {
5402           DBUG_PRINT("info",("[%u] SET",
5403                              (*value).rec->getColumn()->getColumnNo()));
5404           DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5405         }
5406       }
5407       else
5408       {
5409         NdbBlob *ndb_blob= (*value).blob;
5410         uint col_no= field->field_index;
5411         int isNull;
5412         ndb_blob->getDefined(isNull);
5413         if (isNull == 1)
5414         {
5415           DBUG_PRINT("info",("[%u] NULL", col_no));
5416           field->set_null(row_offset);
5417         }
5418         else if (isNull == -1)
5419         {
5420           DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
5421           bitmap_clear_bit(defined, col_no);
5422         }
5423         else
5424         {
5425 #ifndef NDEBUG
5426           // pointer vas set in get_ndb_blobs_value
5427           Field_blob *field_blob= (Field_blob*)field;
5428           uchar* ptr;
5429           field_blob->get_ptr(&ptr, row_offset);
5430           uint32 len= field_blob->get_length(row_offset);
5431           DBUG_PRINT("info",("[%u] SET ptr: 0x%lx  len: %u",
5432                              col_no, (long) ptr, len));
5433 #endif
5434         }
5435       }
5436     }
5437   }
5438   dbug_tmp_restore_column_map(table->write_set, old_map);
5439   DBUG_VOID_RETURN;
5440 }
5441 
5442 /*
5443   Handle error states on events from the storage nodes
5444 */
5445 static int
handle_error(NdbEventOperation * pOp)5446 handle_error(NdbEventOperation *pOp)
5447 {
5448   Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5449   NDB_SHARE *share= event_data->share;
5450   DBUG_ENTER("handle_error");
5451 
5452   sql_print_error("NDB Binlog: unhandled error %d for table %s",
5453                   pOp->hasError(), share->key_string());
5454   pOp->clearError();
5455   DBUG_RETURN(0);
5456 }
5457 
5458 
5459 /*
5460   Handle _non_ data events from the storage nodes
5461 */
5462 
5463 static
5464 void
handle_non_data_event(THD * thd,NdbEventOperation * pOp,ndb_binlog_index_row & row)5465 handle_non_data_event(THD *thd,
5466                       NdbEventOperation *pOp,
5467                       ndb_binlog_index_row &row)
5468 {
5469   const Ndb_event_data* event_data=
5470     static_cast<const Ndb_event_data*>(pOp->getCustomData());
5471   NDB_SHARE *share= event_data->share;
5472   const NDBEVENT::TableEvent type= pOp->getEventType();
5473 
5474   DBUG_ENTER("handle_non_data_event");
5475   DBUG_PRINT("enter", ("pOp: %p, event_data: %p, share: %p",
5476                        pOp, event_data, share));
5477   DBUG_PRINT("enter", ("type: %d", type));
5478 
5479   if (type == NDBEVENT::TE_DROP ||
5480       type == NDBEVENT::TE_ALTER)
5481   {
5482     // Count schema events
5483     row.n_schemaops++;
5484   }
5485 
5486   switch (type)
5487   {
5488   case NDBEVENT::TE_CLUSTER_FAILURE:
5489     if (opt_ndb_extra_logging)
5490       sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
5491                             share->key_string(),
5492                             (uint)(pOp->getGCI() >> 32),
5493                             (uint)(pOp->getGCI()));
5494     // fallthrough
5495   case NDBEVENT::TE_DROP:
5496     if (ndb_apply_status_share == share)
5497     {
5498       if (opt_ndb_extra_logging &&
5499           ndb_binlog_tables_inited && ndb_binlog_running)
5500         sql_print_information("NDB Binlog: ndb tables initially "
5501                               "read only on reconnect.");
5502 
5503       /* release the ndb_apply_status_share */
5504       free_share(&ndb_apply_status_share);
5505       ndb_apply_status_share= 0;
5506       ndb_binlog_tables_inited= FALSE;
5507     }
5508 
5509     ndb_handle_schema_change(thd, injector_ndb, pOp, event_data);
5510     break;
5511 
5512   case NDBEVENT::TE_ALTER:
5513     DBUG_PRINT("info", ("TE_ALTER"));
5514     break;
5515 
5516   case NDBEVENT::TE_NODE_FAILURE:
5517   case NDBEVENT::TE_SUBSCRIBE:
5518   case NDBEVENT::TE_UNSUBSCRIBE:
5519     /* ignore */
5520     break;
5521 
5522   default:
5523     sql_print_error("NDB Binlog: unknown non data event %d for %s. "
5524                     "Ignoring...", (unsigned) type, share->key_string());
5525     break;
5526   }
5527 
5528   DBUG_VOID_RETURN;
5529 }
5530 
5531 /*
5532   Handle data events from the storage nodes
5533 */
5534 inline ndb_binlog_index_row *
ndb_find_binlog_index_row(ndb_binlog_index_row ** rows,uint orig_server_id,int flag)5535 ndb_find_binlog_index_row(ndb_binlog_index_row **rows,
5536                           uint orig_server_id, int flag)
5537 {
5538   ndb_binlog_index_row *row= *rows;
5539   if (opt_ndb_log_orig)
5540   {
5541     ndb_binlog_index_row *first= row, *found_id= 0;
5542     for (;;)
5543     {
5544       if (row->orig_server_id == orig_server_id)
5545       {
5546         /* */
5547         if (!flag || !row->orig_epoch)
5548           return row;
5549         if (!found_id)
5550           found_id= row;
5551       }
5552       if (row->orig_server_id == 0)
5553         break;
5554       row= row->next;
5555       if (row == NULL)
5556       {
5557         row= (ndb_binlog_index_row*)sql_alloc(sizeof(ndb_binlog_index_row));
5558         memset(row, 0, sizeof(ndb_binlog_index_row));
5559         row->next= first;
5560         *rows= row;
5561         if (found_id)
5562         {
5563           /*
5564             If we found index_row with same server id already
5565             that row will contain the current stats.
5566             Copy stats over to new and reset old.
5567           */
5568           row->n_inserts= found_id->n_inserts;
5569           row->n_updates= found_id->n_updates;
5570           row->n_deletes= found_id->n_deletes;
5571           found_id->n_inserts= 0;
5572           found_id->n_updates= 0;
5573           found_id->n_deletes= 0;
5574         }
5575         /* keep track of schema ops only on "first" index_row */
5576         row->n_schemaops= first->n_schemaops;
5577         first->n_schemaops= 0;
5578         break;
5579       }
5580     }
5581     row->orig_server_id= orig_server_id;
5582   }
5583   return row;
5584 }
5585 
5586 
5587 static int
handle_data_event(THD * thd,Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row ** rows,injector::transaction & trans,unsigned & trans_row_count,unsigned & trans_slave_row_count)5588 handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
5589                   ndb_binlog_index_row **rows,
5590                   injector::transaction &trans,
5591                   unsigned &trans_row_count,
5592                   unsigned &trans_slave_row_count)
5593 {
5594   Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5595   TABLE *table= event_data->shadow_table;
5596   NDB_SHARE *share= event_data->share;
5597   bool reflected_op = false;
5598   bool refresh_op = false;
5599   bool read_op = false;
5600 
5601   if (pOp != share->op)
5602   {
5603     return 0;
5604   }
5605 
5606   uint32 anyValue= pOp->getAnyValue();
5607   if (ndbcluster_anyvalue_is_reserved(anyValue))
5608   {
5609     if (ndbcluster_anyvalue_is_nologging(anyValue))
5610       return 0;
5611 
5612     if (ndbcluster_anyvalue_is_reflect_op(anyValue))
5613     {
5614       DBUG_PRINT("info", ("Anyvalue -> Reflect (%u)", anyValue));
5615       reflected_op = true;
5616       anyValue = 0;
5617     }
5618     else if (ndbcluster_anyvalue_is_refresh_op(anyValue))
5619     {
5620       DBUG_PRINT("info", ("Anyvalue -> Refresh"));
5621       refresh_op = true;
5622       anyValue = 0;
5623     }
5624     else if (ndbcluster_anyvalue_is_read_op(anyValue))
5625     {
5626       DBUG_PRINT("info", ("Anyvalue -> Read"));
5627       read_op = true;
5628       anyValue = 0;
5629     }
5630     else
5631     {
5632       sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
5633                         "event not logged",
5634                         anyValue);
5635       return 0;
5636     }
5637   }
5638 
5639   uint32 originating_server_id= ndbcluster_anyvalue_get_serverid(anyValue);
5640   bool log_this_slave_update = g_ndb_log_slave_updates;
5641   bool count_this_event = true;
5642 
5643   if (share == ndb_apply_status_share)
5644   {
5645     /*
5646        Note that option values are read without synchronisation w.r.t.
5647        thread setting option variable or epoch boundaries.
5648     */
5649     if (opt_ndb_log_apply_status ||
5650         opt_ndb_log_orig)
5651     {
5652       Uint32 ndb_apply_status_logging_server_id= originating_server_id;
5653       Uint32 ndb_apply_status_server_id= 0;
5654       Uint64 ndb_apply_status_epoch= 0;
5655       bool event_has_data = false;
5656 
5657       switch(pOp->getEventType())
5658       {
5659       case NDBEVENT::TE_INSERT:
5660       case NDBEVENT::TE_UPDATE:
5661         event_has_data = true;
5662         break;
5663 
5664       case NDBEVENT::TE_DELETE:
5665         break;
5666       default:
5667         /* We should REALLY never get here */
5668         abort();
5669       }
5670 
5671       if (likely( event_has_data ))
5672       {
5673         /* unpack data to fetch orig_server_id and orig_epoch */
5674         uint n_fields= table->s->fields;
5675         MY_BITMAP b;
5676         uint32 bitbuf[128 / (sizeof(uint32) * 8)];
5677         bitmap_init(&b, bitbuf, n_fields, FALSE);
5678         bitmap_set_all(&b);
5679         ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
5680         ndb_apply_status_server_id= (uint)((Field_long *)table->field[0])->val_int();
5681         ndb_apply_status_epoch= ((Field_longlong *)table->field[1])->val_int();
5682 
5683         if (opt_ndb_log_apply_status)
5684         {
5685           /*
5686              Determine if event came from our immediate Master server
5687              Ignore locally manually sourced and reserved events
5688           */
5689           if ((ndb_apply_status_logging_server_id != 0) &&
5690               (! ndbcluster_anyvalue_is_reserved(ndb_apply_status_logging_server_id)))
5691           {
5692             bool isFromImmediateMaster = (ndb_apply_status_server_id ==
5693                                           ndb_apply_status_logging_server_id);
5694 
5695             if (isFromImmediateMaster)
5696             {
5697               /*
5698                  We log this event with our server-id so that it
5699                  propagates back to the originating Master (our
5700                  immediate Master)
5701               */
5702               assert(ndb_apply_status_logging_server_id != ::server_id);
5703 
5704               originating_server_id= 0; /* Will be set to our ::serverid below */
5705             }
5706           }
5707         }
5708 
5709         if (opt_ndb_log_orig)
5710         {
5711           /* store */
5712           ndb_binlog_index_row *row= ndb_find_binlog_index_row
5713             (rows, ndb_apply_status_server_id, 1);
5714           row->orig_epoch= ndb_apply_status_epoch;
5715         }
5716       }
5717     } // opt_ndb_log_apply_status || opt_ndb_log_orig)
5718 
5719     if (opt_ndb_log_apply_status)
5720     {
5721       /* We are logging ndb_apply_status changes
5722        * Don't count this event as making an epoch non-empty
5723        * Log this event in the Binlog
5724        */
5725       count_this_event = false;
5726       log_this_slave_update = true;
5727     }
5728     else
5729     {
5730       /* Not logging ndb_apply_status updates, discard this event now */
5731       return 0;
5732     }
5733   }
5734 
5735   if (originating_server_id == 0)
5736     originating_server_id= ::server_id;
5737   else
5738   {
5739     assert(!reflected_op && !refresh_op);
5740     /* Track that we received a replicated row event */
5741     if (likely( count_this_event ))
5742       trans_slave_row_count++;
5743 
5744     if (!log_this_slave_update)
5745     {
5746       /*
5747         This event comes from a slave applier since it has an originating
5748         server id set. Since option to log slave updates is not set, skip it.
5749       */
5750       return 0;
5751     }
5752   }
5753 
5754   /*
5755      Start with logged_server_id as AnyValue in case it's a composite
5756      (server_id_bits < 31).  This way any user-values are passed-through
5757      to the Binlog in the high bits of the event's Server Id.
5758      In future it may be useful to support *not* mapping composite
5759      AnyValues to/from Binlogged server-ids.
5760   */
5761   uint32 logged_server_id= anyValue;
5762   ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
5763 
5764   /*
5765      Get NdbApi transaction id for this event to put into Binlog
5766   */
5767   Ndb_binlog_extra_row_info extra_row_info;
5768   const uchar* extra_row_info_ptr = NULL;
5769   Uint16 erif_flags = 0;
5770   if (opt_ndb_log_transaction_id)
5771   {
5772     erif_flags |= Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID;
5773     extra_row_info.setTransactionId(pOp->getTransId());
5774   }
5775 
5776   /* Set conflict flags member if necessary */
5777   Uint16 event_conflict_flags = 0;
5778   assert(! (reflected_op && refresh_op));
5779   if (reflected_op)
5780   {
5781     event_conflict_flags |= NDB_ERIF_CFT_REFLECT_OP;
5782   }
5783   else if (refresh_op)
5784   {
5785     event_conflict_flags |= NDB_ERIF_CFT_REFRESH_OP;
5786   }
5787   else if (read_op)
5788   {
5789     event_conflict_flags |= NDB_ERIF_CFT_READ_OP;
5790   }
5791 
5792   DBUG_EXECUTE_IF("ndb_injector_set_event_conflict_flags",
5793                   {
5794                     event_conflict_flags = 0xfafa;
5795                   });
5796   if (event_conflict_flags != 0)
5797   {
5798     erif_flags |= Ndb_binlog_extra_row_info::NDB_ERIF_CFT_FLAGS;
5799     extra_row_info.setConflictFlags(event_conflict_flags);
5800   }
5801 
5802   if (erif_flags != 0)
5803   {
5804     extra_row_info.setFlags(erif_flags);
5805     if (likely(!log_bin_use_v1_row_events))
5806     {
5807       extra_row_info_ptr = extra_row_info.generateBuffer();
5808     }
5809     else
5810     {
5811       /**
5812        * Can't put the metadata in a v1 event
5813        * Produce 1 warning at most
5814        */
5815       if (!g_injector_v1_warning_emitted)
5816       {
5817         sql_print_error("NDB: Binlog Injector discarding row event "
5818                         "meta data as server is using v1 row events. "
5819                         "(%u %x)",
5820                         opt_ndb_log_transaction_id,
5821                         event_conflict_flags);
5822 
5823         g_injector_v1_warning_emitted = true;
5824       }
5825     }
5826   }
5827 
5828   assert(trans.good());
5829   assert(table != 0);
5830 
5831   dbug_print_table("table", table);
5832 
5833   uint n_fields= table->s->fields;
5834   DBUG_PRINT("info", ("Assuming %u columns for table %s",
5835                       n_fields, table->s->table_name.str));
5836   MY_BITMAP b;
5837   my_bitmap_map bitbuf[(NDB_MAX_ATTRIBUTES_IN_TABLE +
5838                             8*sizeof(my_bitmap_map) - 1) /
5839                            (8*sizeof(my_bitmap_map))];
5840   bitmap_init(&b, bitbuf, n_fields, FALSE);
5841   bitmap_set_all(&b);
5842 
5843   /*
5844    row data is already in table->record[0]
5845    As we told the NdbEventOperation to do this
5846    (saves moving data about many times)
5847   */
5848 
5849   /*
5850     for now malloc/free blobs buffer each time
5851     TODO if possible share single permanent buffer with handlers
5852    */
5853   uchar* blobs_buffer[2] = { 0, 0 };
5854   uint blobs_buffer_size[2] = { 0, 0 };
5855 
5856   ndb_binlog_index_row *row=
5857     ndb_find_binlog_index_row(rows, originating_server_id, 0);
5858 
5859   switch(pOp->getEventType())
5860   {
5861   case NDBEVENT::TE_INSERT:
5862     if (likely( count_this_event ))
5863     {
5864       row->n_inserts++;
5865       trans_row_count++;
5866     }
5867     DBUG_PRINT("info", ("INSERT INTO %s.%s",
5868                         table->s->db.str, table->s->table_name.str));
5869     {
5870       int ret;
5871       if (share->flags & NSF_BLOB_FLAG)
5872       {
5873         my_ptrdiff_t ptrdiff= 0;
5874         ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
5875                                   blobs_buffer[0],
5876                                   blobs_buffer_size[0],
5877                                   ptrdiff);
5878         assert(ret == 0);
5879       }
5880       ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
5881       ret = trans.write_row(logged_server_id,
5882                             injector::transaction::table(table, true),
5883                             &b, n_fields, table->record[0],
5884                             extra_row_info_ptr);
5885       assert(ret == 0);
5886     }
5887     break;
5888   case NDBEVENT::TE_DELETE:
5889     if (likely( count_this_event ))
5890     {
5891       row->n_deletes++;
5892       trans_row_count++;
5893     }
5894     DBUG_PRINT("info",("DELETE FROM %s.%s",
5895                        table->s->db.str, table->s->table_name.str));
5896     {
5897       /*
5898         table->record[0] contains only the primary key in this case
5899         since we do not have an after image
5900       */
5901       int n;
5902       if (!get_binlog_full(share) && table->s->primary_key != MAX_KEY)
5903         n= 0; /*
5904                 use the primary key only as it save time and space and
5905                 it is the only thing needed to log the delete
5906               */
5907       else
5908         n= 1; /*
5909                 we use the before values since we don't have a primary key
5910                 since the mysql server does not handle the hidden primary
5911                 key
5912               */
5913 
5914       int ret;
5915       if (share->flags & NSF_BLOB_FLAG)
5916       {
5917         my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
5918         ret = get_ndb_blobs_value(table, event_data->ndb_value[n],
5919                                   blobs_buffer[n],
5920                                   blobs_buffer_size[n],
5921                                   ptrdiff);
5922         assert(ret == 0);
5923       }
5924       ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]);
5925       DBUG_EXECUTE("info", print_records(table, table->record[n]););
5926       ret = trans.delete_row(logged_server_id,
5927                              injector::transaction::table(table, true),
5928                              &b, n_fields, table->record[n],
5929                              extra_row_info_ptr);
5930       assert(ret == 0);
5931     }
5932     break;
5933   case NDBEVENT::TE_UPDATE:
5934     if (likely( count_this_event ))
5935     {
5936       row->n_updates++;
5937       trans_row_count++;
5938     }
5939     DBUG_PRINT("info", ("UPDATE %s.%s",
5940                         table->s->db.str, table->s->table_name.str));
5941     {
5942       int ret;
5943       if (share->flags & NSF_BLOB_FLAG)
5944       {
5945         my_ptrdiff_t ptrdiff= 0;
5946         ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
5947                                   blobs_buffer[0],
5948                                   blobs_buffer_size[0],
5949                                   ptrdiff);
5950         assert(ret == 0);
5951       }
5952       ndb_unpack_record(table, event_data->ndb_value[0],
5953                         &b, table->record[0]);
5954       DBUG_EXECUTE("info", print_records(table, table->record[0]););
5955       if (table->s->primary_key != MAX_KEY &&
5956           !get_binlog_use_update(share))
5957       {
5958         /*
5959           since table has a primary key, we can do a write
5960           using only after values
5961         */
5962         ret = trans.write_row(logged_server_id,
5963                               injector::transaction::table(table, true),
5964                               &b, n_fields, table->record[0],// after values
5965                               extra_row_info_ptr);
5966         assert(ret == 0);
5967       }
5968       else
5969       {
5970         /*
5971           mysql server cannot handle the ndb hidden key and
5972           therefore needs the before image as well
5973         */
5974         if (share->flags & NSF_BLOB_FLAG)
5975         {
5976           my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
5977           ret = get_ndb_blobs_value(table, event_data->ndb_value[1],
5978                                     blobs_buffer[1],
5979                                     blobs_buffer_size[1],
5980                                     ptrdiff);
5981           assert(ret == 0);
5982         }
5983         ndb_unpack_record(table, event_data->ndb_value[1], &b, table->record[1]);
5984         DBUG_EXECUTE("info", print_records(table, table->record[1]););
5985 
5986         MY_BITMAP col_bitmap_before_update;
5987         my_bitmap_map bitbuf[(NDB_MAX_ATTRIBUTES_IN_TABLE +
5988                                   8*sizeof(my_bitmap_map) - 1) /
5989                                  (8*sizeof(my_bitmap_map))];
5990         bitmap_init(&col_bitmap_before_update, bitbuf, n_fields, FALSE);
5991         if (get_binlog_update_minimal(share))
5992         {
5993           event_data->generate_minimal_bitmap(&col_bitmap_before_update, &b);
5994         }
5995         else
5996         {
5997           bitmap_copy(&col_bitmap_before_update, &b);
5998         }
5999 
6000         ret = trans.update_row(logged_server_id,
6001                                injector::transaction::table(table, true),
6002                                &col_bitmap_before_update, &b, n_fields,
6003                                table->record[1], // before values
6004                                table->record[0], // after values
6005                                extra_row_info_ptr);
6006         assert(ret == 0);
6007       }
6008     }
6009     break;
6010   default:
6011     /* We should REALLY never get here. */
6012     DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
6013     break;
6014   }
6015 
6016   if (share->flags & NSF_BLOB_FLAG)
6017   {
6018     my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
6019     my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
6020   }
6021 
6022   return 0;
6023 }
6024 
6025 
6026 /****************************************************************
6027   Injector thread main loop
6028 ****************************************************************/
6029 
6030 static void
remove_event_operations(Ndb * ndb)6031 remove_event_operations(Ndb* ndb)
6032 {
6033   DBUG_ENTER("remove_event_operations");
6034   NdbEventOperation *op;
6035   while ((op= ndb->getEventOperation()))
6036   {
6037     assert(!IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
6038     DBUG_PRINT("info", ("removing event operation on %s",
6039                         op->getEvent()->getName()));
6040 
6041     Ndb_event_data *event_data= (Ndb_event_data *) op->getCustomData();
6042     assert(event_data);
6043 
6044     NDB_SHARE *share= event_data->share;
6045     assert(share != NULL);
6046     assert(share->op == op || share->new_op == op);
6047 
6048     delete event_data;
6049     op->setCustomData(NULL);
6050 
6051     native_mutex_lock(&share->mutex);
6052     share->op= 0;
6053     share->new_op= 0;
6054     native_mutex_unlock(&share->mutex);
6055 
6056     DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
6057                              share->key_string(), share->use_count));
6058     free_share(&share);
6059 
6060     ndb->dropEventOperation(op);
6061   }
6062   DBUG_VOID_RETURN;
6063 }
6064 
6065 extern long long g_event_data_count;
6066 extern long long g_event_nondata_count;
6067 extern long long g_event_bytes_count;
6068 
updateInjectorStats(Ndb * schemaNdb,Ndb * dataNdb)6069 void updateInjectorStats(Ndb* schemaNdb, Ndb* dataNdb)
6070 {
6071   /* Update globals to sum of totals from each listening
6072    * Ndb object
6073    */
6074   g_event_data_count =
6075     schemaNdb->getClientStat(Ndb::DataEventsRecvdCount) +
6076     dataNdb->getClientStat(Ndb::DataEventsRecvdCount);
6077   g_event_nondata_count =
6078     schemaNdb->getClientStat(Ndb::NonDataEventsRecvdCount) +
6079     dataNdb->getClientStat(Ndb::NonDataEventsRecvdCount);
6080   g_event_bytes_count =
6081     schemaNdb->getClientStat(Ndb::EventBytesRecvdCount) +
6082     dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
6083 }
6084 
6085 /**
6086    injectApplyStatusWriteRow
6087 
6088    Inject a WRITE_ROW event on the ndb_apply_status table into
6089    the Binlog.
6090    This contains our server_id and the supplied epoch number.
6091    When applied on the Slave it gives a transactional position
6092    marker
6093 */
6094 static
6095 bool
injectApplyStatusWriteRow(injector::transaction & trans,ulonglong gci)6096 injectApplyStatusWriteRow(injector::transaction& trans,
6097                           ulonglong gci)
6098 {
6099   DBUG_ENTER("injectApplyStatusWriteRow");
6100   if (ndb_apply_status_share == NULL)
6101   {
6102     sql_print_error("NDB: Could not get apply status share");
6103     assert(ndb_apply_status_share != NULL);
6104     DBUG_RETURN(false);
6105   }
6106 
6107   longlong gci_to_store = (longlong) gci;
6108 
6109 #ifndef NDEBUG
6110   DBUG_EXECUTE_IF("ndb_binlog_injector_cycle_gcis",
6111                   {
6112                     ulonglong gciHi = ((gci_to_store >> 32)
6113                                        & 0xffffffff);
6114                     ulonglong gciLo = (gci_to_store & 0xffffffff);
6115                     gciHi = (gciHi % 3);
6116                     sql_print_warning("NDB Binlog injector cycling gcis (%llu -> %llu)",
6117                                       gci_to_store, (gciHi << 32) + gciLo);
6118                     gci_to_store = (gciHi << 32) + gciLo;
6119                   });
6120   DBUG_EXECUTE_IF("ndb_binlog_injector_repeat_gcis",
6121                   {
6122                     ulonglong gciHi = ((gci_to_store >> 32)
6123                                        & 0xffffffff);
6124                     ulonglong gciLo = (gci_to_store & 0xffffffff);
6125                     gciHi=0xffffff00;
6126                     gciLo=0;
6127                     sql_print_warning("NDB Binlog injector repeating gcis (%llu -> %llu)",
6128                                       gci_to_store, (gciHi << 32) + gciLo);
6129                     gci_to_store = (gciHi << 32) + gciLo;
6130                   });
6131 #endif
6132 
6133   /* Build row buffer for generated ndb_apply_status
6134      WRITE_ROW event
6135      First get the relevant table structure.
6136   */
6137   assert(!ndb_apply_status_share->event_data);
6138   assert(ndb_apply_status_share->op);
6139   Ndb_event_data* event_data=
6140     (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
6141   assert(event_data);
6142   assert(event_data->shadow_table);
6143   TABLE* apply_status_table= event_data->shadow_table;
6144 
6145   /*
6146     Intialize apply_status_table->record[0]
6147 
6148     When iterating past the end of the last epoch, the first event of
6149     the new epoch may be on ndb_apply_status.  Its event data saved
6150     in record[0] would be overwritten here by a subsequent event on a
6151     normal table.  So save and restore its record[0].
6152   */
6153   static const ulong sav_max= 512; // current is 284
6154   const ulong sav_len= apply_status_table->s->reclength;
6155   assert(sav_len <= sav_max);
6156   uchar sav_buf[sav_max];
6157   memcpy(sav_buf, apply_status_table->record[0], sav_len);
6158   empty_record(apply_status_table);
6159 
6160   apply_status_table->field[0]->store((longlong)::server_id, true);
6161   apply_status_table->field[1]->store((longlong)gci_to_store, true);
6162   apply_status_table->field[2]->store("", 0, &my_charset_bin);
6163   apply_status_table->field[3]->store((longlong)0, true);
6164   apply_status_table->field[4]->store((longlong)0, true);
6165 #ifndef NDEBUG
6166   const LEX_STRING &name= apply_status_table->s->table_name;
6167   DBUG_PRINT("info", ("use_table: %.*s",
6168                       (int) name.length, name.str));
6169 #endif
6170   injector::transaction::table tbl(apply_status_table, true);
6171   int ret = trans.use_table(::server_id, tbl);
6172   assert(ret == 0); NDB_IGNORE_VALUE(ret);
6173 
6174   ret= trans.write_row(::server_id,
6175                        injector::transaction::table(apply_status_table,
6176                                                     true),
6177                        &apply_status_table->s->all_set,
6178                        apply_status_table->s->fields,
6179                        apply_status_table->record[0]);
6180 
6181   assert(ret == 0);
6182 
6183   memcpy(apply_status_table->record[0], sav_buf, sav_len);
6184   DBUG_RETURN(true);
6185 }
6186 
6187 
6188 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
6189 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
6190 extern ulong opt_ndb_eventbuffer_max_alloc;
6191 extern uint opt_ndb_eventbuffer_free_percent;
6192 
Ndb_binlog_thread()6193 Ndb_binlog_thread::Ndb_binlog_thread()
6194   : Ndb_component("Binlog")
6195 {
6196 }
6197 
6198 
~Ndb_binlog_thread()6199 Ndb_binlog_thread::~Ndb_binlog_thread()
6200 {
6201 }
6202 
6203 
do_wakeup()6204 void Ndb_binlog_thread::do_wakeup()
6205 {
6206   log_info("Wakeup");
6207 
6208   /*
6209     The binlog thread is normally waiting for another
6210     event from the cluster with short timeout and should
6211     soon(within 1 second) detect that stop has been requested.
6212 
6213     There are really no purpose(yet) to signal some condition
6214     trying to wake the thread up should it be waiting somewhere
6215     else since those waits are also short.
6216   */
6217 }
6218 
6219 
6220 void
do_run()6221 Ndb_binlog_thread::do_run()
6222 {
6223   THD *thd; /* needs to be first for thread_stack */
6224   Ndb *i_ndb= 0;
6225   Ndb *s_ndb= 0;
6226   Thd_ndb *thd_ndb=0;
6227   injector *inj= injector::instance();
6228   uint incident_id= 0;
6229   Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
6230 
6231   enum { BCCC_running, BCCC_exit, BCCC_restart } binlog_thread_state;
6232 
6233   /**
6234    * If we get error after having reported incident
6235    *   but before binlog started...we do "Restarting Cluster Binlog"
6236    *   in that case, don't report incident again
6237    */
6238   bool do_incident = true;
6239 
6240   DBUG_ENTER("ndb_binlog_thread");
6241 
6242   native_mutex_lock(&injector_mutex);
6243 
6244   log_info("Starting...");
6245 
6246   thd= new THD; /* note that contructor of THD uses DBUG_ */
6247   THD_CHECK_SENTRY(thd);
6248 
6249   /* We need to set thd->thread_id before thd->store_globals, or it will
6250      set an invalid value for thd->variables.pseudo_thread_id.
6251   */
6252   thd->set_new_thread_id();
6253 
6254   thd->thread_stack= (char*) &thd; /* remember where our stack is */
6255   if (thd->store_globals())
6256   {
6257     delete thd;
6258     native_mutex_unlock(&injector_mutex);
6259     native_cond_signal(&injector_cond);
6260     DBUG_VOID_RETURN;
6261   }
6262 
6263   thd_set_command(thd, COM_DAEMON);
6264   thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
6265 #ifndef NDB_THD_HAS_NO_VERSION
6266   thd->version= refresh_version;
6267 #endif
6268   thd->get_protocol_classic()->set_client_capabilities(0);
6269   thd->security_context()->skip_grants();
6270   // Create thd->net vithout vio
6271   thd->get_protocol_classic()->init_net((st_vio *) 0);
6272 
6273   // Ndb binlog thread always use row format
6274   thd->set_current_stmt_binlog_format_row();
6275 
6276   thd->real_id= my_thread_self();
6277   thd_manager->add_thd(thd);
6278   thd->lex->start_transaction_opt= 0;
6279 
6280   log_info("Started");
6281 
6282   Ndb_schema_dist_data schema_dist_data;
6283 
6284 restart_cluster_failure:
6285   int have_injector_mutex_lock= 0;
6286   binlog_thread_state= BCCC_exit;
6287 
6288   log_verbose(1, "Setting up");
6289 
6290   if (!(thd_ndb= Thd_ndb::seize(thd)))
6291   {
6292     log_error("Creating Thd_ndb object failed");
6293     native_mutex_unlock(&injector_mutex);
6294     native_cond_signal(&injector_cond);
6295     goto err;
6296   }
6297 
6298   if (!(s_ndb= new Ndb(g_ndb_cluster_connection, NDB_REP_DB)) ||
6299       s_ndb->setNdbObjectName("Ndb Binlog schema change monitoring") ||
6300       s_ndb->init())
6301   {
6302     log_error("Creating schema Ndb object failed");
6303     native_mutex_unlock(&injector_mutex);
6304     native_cond_signal(&injector_cond);
6305     goto err;
6306   }
6307   log_info("Created schema Ndb object, reference: 0x%x, name: '%s'",
6308            s_ndb->getReference(), s_ndb->getNdbObjectName());
6309 
6310   // empty database
6311   if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
6312       i_ndb->setNdbObjectName("Ndb Binlog data change monitoring") ||
6313       i_ndb->init())
6314   {
6315     log_error("Creating injector Ndb object failed");
6316     native_mutex_unlock(&injector_mutex);
6317     native_cond_signal(&injector_cond);
6318     goto err;
6319   }
6320   log_info("Created injector Ndb object, reference: 0x%x, name: '%s'",
6321                       i_ndb->getReference(), i_ndb->getNdbObjectName());
6322 
6323   /* Set free percent event buffer needed to resume buffering */
6324   if (i_ndb->set_eventbuffer_free_percent(opt_ndb_eventbuffer_free_percent))
6325   {
6326     log_error("Setting ventbuffer free percent failed");
6327     native_mutex_unlock(&injector_mutex);
6328     native_cond_signal(&injector_cond);
6329     goto err;
6330   }
6331 
6332   log_verbose(10, "Exposing global references");
6333   /*
6334     Expose global reference to our ndb object.
6335 
6336     Used by both sql client thread and binlog thread to interact
6337     with the storage
6338     native_mutex_lock(&injector_mutex);
6339   */
6340   injector_thd= thd;
6341   injector_ndb= i_ndb;
6342   schema_ndb= s_ndb;
6343 
6344   if (opt_bin_log && opt_ndb_log_bin)
6345   {
6346     ndb_binlog_running= TRUE;
6347   }
6348 
6349   log_verbose(1, "Setup completed");
6350 
6351   /* Thread start up completed  */
6352   native_mutex_unlock(&injector_mutex);
6353   native_cond_signal(&injector_cond);
6354 
6355   log_verbose(1, "Wait for server start completed");
6356   /*
6357     wait for mysql server to start (so that the binlog is started
6358     and thus can receive the first GAP event)
6359   */
6360   mysql_mutex_lock(&LOCK_server_started);
6361   while (!mysqld_server_started)
6362   {
6363     struct timespec abstime;
6364     set_timespec(&abstime, 1);
6365     mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
6366                          &abstime);
6367     if (is_stop_requested())
6368     {
6369       mysql_mutex_unlock(&LOCK_server_started);
6370       goto err;
6371     }
6372   }
6373   mysql_mutex_unlock(&LOCK_server_started);
6374 
6375   // Defer call of THD::init_for_query until after mysqld_server_started
6376   // to ensure that the parts of MySQL Server it uses has been created
6377   thd->init_for_queries();
6378 
6379   log_verbose(1, "Check for incidents");
6380 
6381   while (do_incident && ndb_binlog_running)
6382   {
6383     /*
6384       check if it is the first log, if so we do not insert a GAP event
6385       as there is really no log to have a GAP in
6386     */
6387     if (incident_id == 0)
6388     {
6389       LOG_INFO log_info;
6390       mysql_bin_log.get_current_log(&log_info);
6391       int len=  (uint)strlen(log_info.log_file_name);
6392       uint no= 0;
6393       if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
6394           no == 1)
6395       {
6396         /* this is the fist log, so skip GAP event */
6397         break;
6398       }
6399     }
6400 
6401     /*
6402       Always insert a GAP event as we cannot know what has happened
6403       in the cluster while not being connected.
6404     */
6405     LEX_STRING const msg[2]=
6406       {
6407         { C_STRING_WITH_LEN("mysqld startup")    },
6408         { C_STRING_WITH_LEN("cluster disconnect")}
6409       };
6410     int ret = inj->record_incident(thd,
6411                                    binary_log::Incident_event::INCIDENT_LOST_EVENTS,
6412                                    msg[incident_id]);
6413     assert(ret == 0); NDB_IGNORE_VALUE(ret);
6414     do_incident = false; // Don't report incident again, unless we get started
6415     break;
6416   }
6417   incident_id= 1;
6418   {
6419     log_verbose(1, "Wait for cluster to start");
6420     thd->proc_info= "Waiting for ndbcluster to start";
6421 
6422     native_mutex_lock(&injector_mutex);
6423     while (!ndb_schema_share ||
6424            (ndb_binlog_running && !ndb_apply_status_share) ||
6425            !ndb_binlog_tables_inited)
6426     {
6427       if (!thd_ndb->valid_ndb())
6428       {
6429         /*
6430           Cluster has gone away before setup was completed.
6431           Keep lock on injector_mutex to prevent further
6432           usage of the injector_ndb, and restart binlog
6433           thread to get rid of any garbage on the ndb objects
6434         */
6435         have_injector_mutex_lock= 1;
6436         binlog_thread_state= BCCC_restart;
6437         goto err;
6438       }
6439       /* ndb not connected yet */
6440       struct timespec abstime;
6441       set_timespec(&abstime, 1);
6442       native_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
6443       if (is_stop_requested())
6444       {
6445         native_mutex_unlock(&injector_mutex);
6446         goto err;
6447       }
6448       if (thd->killed == THD::KILL_CONNECTION)
6449       {
6450         /*
6451           Since the ndb binlog thread adds itself to the "global thread list"
6452           it need to look at the "killed" flag and stop the thread to avoid
6453           that the server hangs during shutdown while waiting for the "global
6454           thread list" to be emtpy.
6455         */
6456         sql_print_information("NDB Binlog: Server shutdown detected while "
6457                               "waiting for ndbcluster to start...");
6458         native_mutex_unlock(&injector_mutex);
6459         goto err;
6460       }
6461     }
6462     native_mutex_unlock(&injector_mutex);
6463 
6464     assert(ndbcluster_hton->slot != ~(uint)0);
6465     thd_set_thd_ndb(thd, thd_ndb);
6466     thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
6467     thd->query_id= 0; // to keep valgrind quiet
6468   }
6469 
6470   schema_dist_data.init(g_ndb_cluster_connection);
6471 
6472   {
6473     log_verbose(1, "Wait for first event");
6474     // wait for the first event
6475     thd->proc_info= "Waiting for first event from ndbcluster";
6476     int schema_res, res;
6477     Uint64 schema_gci;
6478     do
6479     {
6480       DBUG_PRINT("info", ("Waiting for the first event"));
6481 
6482       if (is_stop_requested())
6483         goto err;
6484 
6485       schema_res= s_ndb->pollEvents(100, &schema_gci);
6486     } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
6487     if (ndb_binlog_running)
6488     {
6489       Uint64 gci= i_ndb->getLatestGCI();
6490       while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
6491       {
6492         if (is_stop_requested())
6493           goto err;
6494         res= i_ndb->pollEvents(10, &gci);
6495       }
6496       if (gci > schema_gci)
6497       {
6498         schema_gci= gci;
6499       }
6500     }
6501     // now check that we have epochs consistant with what we had before the restart
6502     DBUG_PRINT("info", ("schema_res: %d  schema_gci: %u/%u", schema_res,
6503                         (uint)(schema_gci >> 32),
6504                         (uint)(schema_gci)));
6505     {
6506       i_ndb->flushIncompleteEvents(schema_gci);
6507       s_ndb->flushIncompleteEvents(schema_gci);
6508       if (schema_gci < ndb_latest_handled_binlog_epoch)
6509       {
6510         sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
6511                         "ndb_latest_handled_binlog_epoch: %u/%u, while current epoch: %u/%u. "
6512                         "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
6513                         (uint)(ndb_latest_handled_binlog_epoch >> 32),
6514                         (uint)(ndb_latest_handled_binlog_epoch),
6515                         (uint)(schema_gci >> 32),
6516                         (uint)(schema_gci));
6517         ndb_set_latest_trans_gci(0);
6518         ndb_latest_handled_binlog_epoch= 0;
6519         ndb_latest_applied_binlog_epoch= 0;
6520         ndb_latest_received_binlog_epoch= 0;
6521         ndb_index_stat_restart();
6522       }
6523       else if (ndb_latest_applied_binlog_epoch > 0)
6524       {
6525         sql_print_warning("NDB Binlog: cluster has reconnected. "
6526                           "Changes to the database that occured while "
6527                           "disconnected will not be in the binlog");
6528       }
6529       if (opt_ndb_extra_logging)
6530       {
6531         sql_print_information("NDB Binlog: starting log at epoch %u/%u",
6532                               (uint)(schema_gci >> 32),
6533                               (uint)(schema_gci));
6534       }
6535     }
6536     log_verbose(1, "Got first event");
6537   }
6538   /*
6539     binlog thread is ready to receive events
6540     - client threads may now start updating data, i.e. tables are
6541     no longer read only
6542   */
6543   ndb_binlog_is_ready= TRUE;
6544 
6545   if (opt_ndb_extra_logging)
6546     sql_print_information("NDB Binlog: ndb tables writable");
6547   ndb_tdc_close_cached_tables();
6548 
6549   /*
6550      Signal any waiting thread that ndb table setup is
6551      now complete
6552   */
6553   ndb_notify_tables_writable();
6554 
6555   {
6556     static LEX_CSTRING db_lex_cstr= EMPTY_CSTR;
6557     thd->reset_db(db_lex_cstr);
6558   }
6559 
6560   log_verbose(1, "Startup and setup completed");
6561 
6562   /*
6563     Main NDB Injector loop
6564   */
6565   do_incident = true; // If we get disconnected again...do incident report
6566   binlog_thread_state= BCCC_running;
6567   for ( ; !((is_stop_requested() ||
6568              binlog_thread_state) &&
6569             ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
6570           binlog_thread_state != BCCC_restart; )
6571   {
6572 #ifndef NDEBUG
6573     if (binlog_thread_state)
6574     {
6575       DBUG_PRINT("info", ("binlog_thread_state: %d, "
6576                           "ndb_latest_handled_binlog_epoch: %u/%u, "
6577                           "*get_latest_trans_gci(): %u/%u",
6578                           binlog_thread_state,
6579                           (uint)(ndb_latest_handled_binlog_epoch >> 32),
6580                           (uint)(ndb_latest_handled_binlog_epoch),
6581                           (uint)(ndb_get_latest_trans_gci() >> 32),
6582                           (uint)(ndb_get_latest_trans_gci())));
6583     }
6584 #endif
6585 
6586     /*
6587       now we don't want any events before next gci is complete
6588     */
6589     thd->proc_info= "Waiting for event from ndbcluster";
6590     thd->set_time();
6591 
6592     /* wait for event or 1000 ms */
6593     Uint64 gci= 0, schema_gci;
6594     int res= 0, tot_poll_wait= 1000;
6595 
6596     if (ndb_binlog_running)
6597     {
6598       // Capture any dynamic changes to max_alloc
6599       i_ndb->set_eventbuf_max_alloc(opt_ndb_eventbuffer_max_alloc);
6600 
6601       res= i_ndb->pollEvents(tot_poll_wait, &gci);
6602       tot_poll_wait= 0;
6603     }
6604     int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
6605     ndb_latest_received_binlog_epoch= gci;
6606 
6607     while (gci > schema_gci && schema_res >= 0)
6608     {
6609       static char buf[64];
6610       thd->proc_info= "Waiting for schema epoch";
6611       my_snprintf(buf, sizeof(buf), "%s %u/%u(%u/%u)", thd->proc_info,
6612                   (uint)(schema_gci >> 32),
6613                   (uint)(schema_gci),
6614                   (uint)(gci >> 32),
6615                   (uint)(gci));
6616       thd->proc_info= buf;
6617       schema_res= s_ndb->pollEvents(10, &schema_gci);
6618     }
6619 
6620     if ((is_stop_requested() ||
6621          binlog_thread_state) &&
6622         (ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
6623          !ndb_binlog_running))
6624       break; /* Stopping thread */
6625 
6626     if (thd->killed == THD::KILL_CONNECTION)
6627     {
6628       /*
6629         Since the ndb binlog thread adds itself to the "global thread list"
6630         it need to look at the "killed" flag and stop the thread to avoid
6631         that the server hangs during shutdown while waiting for the "global
6632         thread list" to be emtpy.
6633         In pre 5.6 versions the thread was also added to "global thread
6634         list" but the "global thread *count*" variable was not incremented
6635         and thus the same problem didn't exist.
6636         The only reason for adding the ndb binlog thread to "global thread
6637         list" is to be able to see the thread state using SHOW PROCESSLIST
6638         and I_S.PROCESSLIST
6639       */
6640       sql_print_information("NDB Binlog: Server shutdown detected...");
6641       break;
6642     }
6643 
6644     MEM_ROOT **root_ptr= my_thread_get_THR_MALLOC();
6645     MEM_ROOT *old_root= *root_ptr;
6646     MEM_ROOT mem_root;
6647     init_sql_alloc(PSI_INSTRUMENT_ME, &mem_root, 4096, 0);
6648 
6649     // The Ndb_schema_event_handler does not necessarily need
6650     // to use the same memroot(or vice versa)
6651     Ndb_schema_event_handler
6652       schema_event_handler(thd, &mem_root,
6653                            g_ndb_cluster_connection->node_id(),
6654                            schema_dist_data);
6655 
6656     *root_ptr= &mem_root;
6657 
6658     if (unlikely(schema_res > 0))
6659     {
6660       thd->proc_info= "Processing events from schema table";
6661       g_ndb_log_slave_updates= opt_log_slave_updates;
6662       s_ndb->
6663         setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6664       s_ndb->
6665         setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6666       NdbEventOperation *pOp= s_ndb->nextEvent();
6667       while (pOp != NULL)
6668       {
6669         if (!pOp->hasError())
6670         {
6671           schema_event_handler.handle_event(s_ndb, pOp);
6672 
6673           DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
6674                               s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6675                               "<empty>"));
6676           DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
6677                               i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6678                               "<empty>"));
6679           if (i_ndb->getEventOperation() == NULL &&
6680               s_ndb->getEventOperation() == NULL &&
6681               binlog_thread_state == BCCC_running)
6682           {
6683             DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
6684             binlog_thread_state= BCCC_restart;
6685             if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
6686             {
6687               sql_print_error("NDB Binlog: latest transaction in epoch %u/%u not in binlog "
6688                               "as latest received epoch is %u/%u",
6689                               (uint)(ndb_get_latest_trans_gci() >> 32),
6690                               (uint)(ndb_get_latest_trans_gci()),
6691                               (uint)(ndb_latest_received_binlog_epoch >> 32),
6692                               (uint)(ndb_latest_received_binlog_epoch));
6693             }
6694           }
6695         }
6696         else
6697           sql_print_error("NDB: error %lu (%s) on handling "
6698                           "binlog schema event",
6699                           (ulong) pOp->getNdbError().code,
6700                           pOp->getNdbError().message);
6701         pOp= s_ndb->nextEvent();
6702       }
6703       updateInjectorStats(s_ndb, i_ndb);
6704     }
6705 
6706     if (!ndb_binlog_running)
6707     {
6708       /*
6709         Just consume any events, not used if no binlogging
6710         e.g. node failure events
6711       */
6712       Uint64 tmp_gci;
6713       if (i_ndb->pollEvents(0, &tmp_gci))
6714       {
6715         NdbEventOperation *pOp;
6716         while ((pOp= i_ndb->nextEvent()))
6717         {
6718           if ((unsigned) pOp->getEventType() >=
6719               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
6720           {
6721             ndb_binlog_index_row row;
6722             handle_non_data_event(thd, pOp, row);
6723           }
6724         }
6725         if (i_ndb->getEventOperation() == NULL &&
6726             s_ndb->getEventOperation() == NULL &&
6727             binlog_thread_state == BCCC_running)
6728         {
6729           DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
6730           binlog_thread_state= BCCC_restart;
6731         }
6732       }
6733       updateInjectorStats(s_ndb, i_ndb);
6734     }
6735     else if (res > 0 ||
6736              (ndb_log_empty_epochs() &&
6737               gci > ndb_latest_handled_binlog_epoch))
6738     {
6739       DBUG_PRINT("info", ("pollEvents res: %d", res));
6740       thd->proc_info= "Processing events";
6741       NdbEventOperation *pOp= i_ndb->nextEvent();
6742       ndb_binlog_index_row _row;
6743       ndb_binlog_index_row *rows= &_row;
6744       injector::transaction trans;
6745       unsigned trans_row_count= 0;
6746       unsigned trans_slave_row_count= 0;
6747       if (!pOp)
6748       {
6749         /*
6750           Must be an empty epoch since the condition
6751           (ndb_log_empty_epochs() &&
6752            gci > ndb_latest_handled_binlog_epoch)
6753           must be true we write empty epoch into
6754           ndb_binlog_index
6755         */
6756         DBUG_PRINT("info", ("Writing empty epoch for gci %llu", gci));
6757         DBUG_PRINT("info", ("Initializing transaction"));
6758         inj->new_trans(thd, &trans);
6759         rows= &_row;
6760         memset(&_row, 0, sizeof(_row));
6761         thd->variables.character_set_client= &my_charset_latin1;
6762         goto commit_to_binlog;
6763       }
6764       while (pOp != NULL)
6765       {
6766         rows= &_row;
6767         gci= pOp->getGCI();
6768         DBUG_PRINT("info", ("Handling gci: %u/%u",
6769                             (uint)(gci >> 32),
6770                             (uint)(gci)));
6771         // sometimes get TE_ALTER with invalid table
6772         assert(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
6773                ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
6774         assert(gci <= ndb_latest_received_binlog_epoch);
6775 
6776         /* Update our thread-local debug settings based on the global */
6777 #ifndef NDEBUG
6778         /* Get value of global...*/
6779         {
6780           char buf[256];
6781           DBUG_EXPLAIN_INITIAL(buf, sizeof(buf));
6782           //  fprintf(stderr, "Ndb Binlog Injector, setting debug to %s\n",
6783           //          buf);
6784           DBUG_SET(buf);
6785         }
6786 #endif
6787 
6788         /* initialize some variables for this epoch */
6789 
6790         i_ndb->set_eventbuf_max_alloc(opt_ndb_eventbuffer_max_alloc);
6791         g_ndb_log_slave_updates= opt_log_slave_updates;
6792         i_ndb->
6793           setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6794         i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6795 
6796         memset(&_row, 0, sizeof(_row));
6797         thd->variables.character_set_client= &my_charset_latin1;
6798         DBUG_PRINT("info", ("Initializing transaction"));
6799         inj->new_trans(thd, &trans);
6800         trans_row_count= 0;
6801         trans_slave_row_count= 0;
6802         // pass table map before epoch
6803         {
6804           Uint32 iter= 0;
6805           const NdbEventOperation *gci_op;
6806           Uint32 event_types;
6807 
6808           if (!i_ndb->isConsistentGCI(gci))
6809           {
6810             char errmsg[64];
6811             uint end= sprintf(&errmsg[0],
6812                               "Detected missing data in GCI %llu, "
6813                               "inserting GAP event", gci);
6814             errmsg[end]= '\0';
6815             DBUG_PRINT("info",
6816                        ("Detected missing data in GCI %llu, "
6817                         "inserting GAP event", gci));
6818             LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
6819             inj->record_incident(thd,
6820                                  binary_log::Incident_event::INCIDENT_LOST_EVENTS,
6821                                  msg);
6822           }
6823           while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
6824                  != NULL)
6825           {
6826             Ndb_event_data *event_data=
6827               (Ndb_event_data *) gci_op->getCustomData();
6828             NDB_SHARE *share= (event_data)?event_data->share:NULL;
6829             DBUG_PRINT("info", ("per gci_op: 0x%lx  share: 0x%lx  event_types: 0x%x",
6830                                 (long) gci_op, (long) share, event_types));
6831             // workaround for interface returning TE_STOP events
6832             // which are normally filtered out below in the nextEvent loop
6833             if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
6834             {
6835               DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
6836                                   gci_op->getEvent()->getTable()->getName()));
6837               continue;
6838             }
6839             // this should not happen
6840             if (share == NULL || event_data->shadow_table == NULL)
6841             {
6842               DBUG_PRINT("info", ("no share or table %s!",
6843                                   gci_op->getEvent()->getTable()->getName()));
6844               continue;
6845             }
6846             if (share == ndb_apply_status_share)
6847             {
6848               // skip this table, it is handled specially
6849               continue;
6850             }
6851             TABLE *table= event_data->shadow_table;
6852 #ifndef NDEBUG
6853             const LEX_STRING &name= table->s->table_name;
6854 #endif
6855             if ((event_types & (NdbDictionary::Event::TE_INSERT |
6856                                 NdbDictionary::Event::TE_UPDATE |
6857                                 NdbDictionary::Event::TE_DELETE)) == 0)
6858             {
6859               DBUG_PRINT("info", ("skipping non data event table: %.*s",
6860                                   (int) name.length, name.str));
6861               continue;
6862             }
6863             if (!trans.good())
6864             {
6865               DBUG_PRINT("info",
6866                          ("Found new data event, initializing transaction"));
6867               inj->new_trans(thd, &trans);
6868             }
6869             DBUG_PRINT("info", ("use_table: %.*s, cols %u",
6870                                 (int) name.length, name.str,
6871                                 table->s->fields));
6872             injector::transaction::table tbl(table, true);
6873             int ret = trans.use_table(::server_id, tbl);
6874             assert(ret == 0); NDB_IGNORE_VALUE(ret);
6875           }
6876         }
6877         if (trans.good())
6878         {
6879           /* Inject ndb_apply_status WRITE_ROW event */
6880           if (!injectApplyStatusWriteRow(trans,
6881                                          gci))
6882           {
6883             sql_print_error("NDB Binlog: Failed to inject apply status write row");
6884           }
6885         }
6886 
6887         do
6888         {
6889           if (pOp->hasError() &&
6890               handle_error(pOp) < 0)
6891             goto err;
6892 
6893 #ifndef NDEBUG
6894           {
6895             Ndb_event_data *event_data=
6896               (Ndb_event_data *) pOp->getCustomData();
6897             NDB_SHARE *share= (event_data)?event_data->share:NULL;
6898             DBUG_PRINT("info",
6899                        ("EVENT TYPE: %d  GCI: %u/%u last applied: %u/%u  "
6900                         "share: 0x%lx (%s.%s)", pOp->getEventType(),
6901                         (uint)(gci >> 32),
6902                         (uint)(gci),
6903                         (uint)(ndb_latest_applied_binlog_epoch >> 32),
6904                         (uint)(ndb_latest_applied_binlog_epoch),
6905                         (long) share,
6906                         share ? share->db :  "'NULL'",
6907                         share ? share->table_name : "'NULL'"));
6908             assert(share != 0);
6909           }
6910           // assert that there is consistancy between gci op list
6911           // and event list
6912           {
6913             Uint32 iter= 0;
6914             const NdbEventOperation *gci_op;
6915             Uint32 event_types;
6916             while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
6917                    != NULL)
6918             {
6919               if (gci_op == pOp)
6920                 break;
6921             }
6922             assert(gci_op == pOp);
6923             assert((event_types & pOp->getEventType()) != 0);
6924           }
6925 #endif
6926 
6927           if ((unsigned) pOp->getEventType() <
6928               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
6929             handle_data_event(thd, i_ndb, pOp, &rows, trans,
6930                               trans_row_count, trans_slave_row_count);
6931           else
6932           {
6933             handle_non_data_event(thd, pOp, *rows);
6934             DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
6935                                 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6936                                 "<empty>"));
6937             DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
6938                                 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6939                                 "<empty>"));
6940             if (i_ndb->getEventOperation() == NULL &&
6941                 s_ndb->getEventOperation() == NULL &&
6942                 binlog_thread_state == BCCC_running)
6943             {
6944               DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
6945               binlog_thread_state= BCCC_restart;
6946               if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
6947               {
6948                 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
6949                                 "as latest received epoch is %lu",
6950                                 (ulong) ndb_get_latest_trans_gci(),
6951                                 (ulong) ndb_latest_received_binlog_epoch);
6952               }
6953             }
6954           }
6955 
6956           // Capture any dynamic changes to max_alloc
6957           i_ndb->set_eventbuf_max_alloc(opt_ndb_eventbuffer_max_alloc);
6958 
6959           pOp= i_ndb->nextEvent();
6960         } while (pOp && pOp->getGCI() == gci);
6961 
6962          updateInjectorStats(s_ndb, i_ndb);
6963 
6964         /*
6965           note! pOp is not referring to an event in the next epoch
6966           or is == 0
6967         */
6968 
6969         while (trans.good())
6970         {
6971       commit_to_binlog:
6972           if (!ndb_log_empty_epochs())
6973           {
6974             /*
6975               If
6976                 - We did not add any 'real' rows to the Binlog AND
6977                 - We did not apply any slave row updates, only
6978                   ndb_apply_status updates
6979               THEN
6980                 Don't write the Binlog transaction which just
6981                 contains ndb_apply_status updates.
6982                 (For cicular rep with log_apply_status, ndb_apply_status
6983                 updates will propagate while some related, real update
6984                 is propagating)
6985             */
6986             if ((trans_row_count == 0) &&
6987                 (! (opt_ndb_log_apply_status &&
6988                     trans_slave_row_count) ))
6989             {
6990               /* nothing to commit, rollback instead */
6991               if (int r= trans.rollback())
6992               {
6993                 sql_print_error("NDB Binlog: "
6994                                 "Error during ROLLBACK of GCI %u/%u. Error: %d",
6995                                 uint(gci >> 32), uint(gci), r);
6996                 /* TODO: Further handling? */
6997               }
6998               break;
6999             }
7000           }
7001           thd->proc_info= "Committing events to binlog";
7002           if (int r= trans.commit())
7003           {
7004             sql_print_error("NDB Binlog: "
7005                             "Error during COMMIT of GCI. Error: %d",
7006                             r);
7007             /* TODO: Further handling? */
7008           }
7009           injector::transaction::binlog_pos start= trans.start_pos();
7010           injector::transaction::binlog_pos next = trans.next_pos();
7011           rows->gci= (Uint32)(gci >> 32); // Expose gci hi/lo
7012           rows->epoch= gci;
7013           rows->start_master_log_file= start.file_name();
7014           rows->start_master_log_pos= start.file_pos();
7015           if ((next.file_pos() == 0) &&
7016               ndb_log_empty_epochs())
7017           {
7018             /* Empty transaction 'committed' due to log_empty_epochs
7019              * therefore no next position
7020              */
7021             rows->next_master_log_file= start.file_name();
7022             rows->next_master_log_pos= start.file_pos();
7023           }
7024           else
7025           {
7026             rows->next_master_log_file= next.file_name();
7027             rows->next_master_log_pos= next.file_pos();
7028           }
7029 
7030           DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
7031           if (opt_ndb_log_binlog_index)
7032           {
7033             if (ndb_binlog_index_table__write_rows(thd, rows))
7034             {
7035               /*
7036                  Writing to ndb_binlog_index failed, check if we are
7037                  being killed and retry
7038               */
7039               if (thd->killed)
7040               {
7041                 DBUG_PRINT("error", ("Failed to write to ndb_binlog_index at shutdown, retrying"));
7042                 mysql_mutex_lock(&thd->LOCK_thd_data);
7043                 const THD::killed_state save_killed= thd->killed;
7044                 /* We are cleaning up, allow for flushing last epoch */
7045                 thd->killed= THD::NOT_KILLED;
7046                 /* also clear error from last failing write */
7047                 thd->clear_error();
7048                 ndb_binlog_index_table__write_rows(thd, rows);
7049                 /* Restore kill flag */
7050                 thd->killed= save_killed;
7051                 mysql_mutex_unlock(&thd->LOCK_thd_data);
7052               }
7053             }
7054           }
7055           ndb_latest_applied_binlog_epoch= gci;
7056           break;
7057         }
7058         ndb_latest_handled_binlog_epoch= gci;
7059       }
7060 
7061       if(!i_ndb->isConsistent(gci))
7062       {
7063         char errmsg[64];
7064         uint end= sprintf(&errmsg[0],
7065                           "Detected missing data in GCI %llu, "
7066                           "inserting GAP event", gci);
7067         errmsg[end]= '\0';
7068         DBUG_PRINT("info",
7069                    ("Detected missing data in GCI %llu, "
7070                     "inserting GAP event", gci));
7071         LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
7072         inj->record_incident(thd,
7073                              binary_log::Incident_event::INCIDENT_LOST_EVENTS,
7074                              msg);
7075       }
7076     }
7077 
7078     // Notify the schema event handler about post_epoch so it may finish
7079     // any outstanding business
7080     schema_event_handler.post_epoch();
7081 
7082     free_root(&mem_root, MYF(0));
7083     *root_ptr= old_root;
7084     ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
7085   }
7086  err:
7087   if (binlog_thread_state != BCCC_restart)
7088   {
7089     log_info("Shutting down");
7090     thd->proc_info= "Shutting down";
7091   }
7092   else
7093   {
7094     log_info("Restarting");
7095     thd->proc_info= "Restarting";
7096   }
7097   if (!have_injector_mutex_lock)
7098     native_mutex_lock(&injector_mutex);
7099   /* don't mess with the injector_ndb anymore from other threads */
7100   injector_thd= 0;
7101   injector_ndb= 0;
7102   schema_ndb= 0;
7103   native_mutex_unlock(&injector_mutex);
7104   thd->reset_db(NULL_CSTR); // as not to try to free memory
7105 
7106   /*
7107     This will cause the util thread to start to try to initialize again
7108     via ndbcluster_setup_binlog_table_shares.  But since injector_ndb is
7109     set to NULL it will not succeed until injector_ndb is reinitialized.
7110   */
7111   ndb_binlog_tables_inited= FALSE;
7112 
7113   if (ndb_apply_status_share)
7114   {
7115     /* ndb_share reference binlog extra free */
7116     DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
7117                              ndb_apply_status_share->key_string(),
7118                              ndb_apply_status_share->use_count));
7119     free_share(&ndb_apply_status_share);
7120     ndb_apply_status_share= 0;
7121   }
7122   if (ndb_schema_share)
7123   {
7124     /* begin protect ndb_schema_share */
7125     native_mutex_lock(&ndb_schema_share_mutex);
7126     /* ndb_share reference binlog extra free */
7127     DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
7128                              ndb_schema_share->key_string(),
7129                              ndb_schema_share->use_count));
7130     free_share(&ndb_schema_share);
7131     ndb_schema_share= 0;
7132     native_mutex_unlock(&ndb_schema_share_mutex);
7133     /* end protect ndb_schema_share */
7134   }
7135 
7136   /* remove all event operations */
7137   if (s_ndb)
7138   {
7139     remove_event_operations(s_ndb);
7140     delete s_ndb;
7141     s_ndb= 0;
7142   }
7143   if (i_ndb)
7144   {
7145     remove_event_operations(i_ndb);
7146     delete i_ndb;
7147     i_ndb= 0;
7148   }
7149 
7150   if (thd_ndb)
7151   {
7152     Thd_ndb::release(thd_ndb);
7153     thd_set_thd_ndb(thd, NULL);
7154     thd_ndb= NULL;
7155   }
7156 
7157   /**
7158    * release all extra references from tables
7159    */
7160   {
7161     if (opt_ndb_extra_logging > 9)
7162       sql_print_information("NDB Binlog: Release extra share references");
7163 
7164     native_mutex_lock(&ndbcluster_mutex);
7165     for (uint i= 0; i < ndbcluster_open_tables.records;)
7166     {
7167       NDB_SHARE * share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,
7168                                                       i);
7169       if (share->state != NSS_DROPPED)
7170       {
7171         /*
7172           The share kept by the server has not been freed, free it
7173         */
7174         ndbcluster_mark_share_dropped(share);
7175         /* ndb_share reference create free */
7176         DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
7177                                  share->key_string(), share->use_count));
7178         free_share(&share, TRUE);
7179 
7180         /**
7181          * This might have altered hash table...not sure if it's stable..
7182          *   so we'll restart instead
7183          */
7184         i = 0;
7185       }
7186       else
7187       {
7188         i++;
7189       }
7190     }
7191     native_mutex_unlock(&ndbcluster_mutex);
7192   }
7193   log_info("Stopping...");
7194 
7195   ndb_tdc_close_cached_tables();
7196   if (opt_ndb_extra_logging > 15)
7197   {
7198     sql_print_information("NDB Binlog: remaining open tables: ");
7199     for (uint i= 0; i < ndbcluster_open_tables.records; i++)
7200     {
7201       NDB_SHARE* share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,i);
7202       sql_print_information("  %s.%s state: %u use_count: %u",
7203                             share->db,
7204                             share->table_name,
7205                             (uint)share->state,
7206                             share->use_count);
7207     }
7208   }
7209 
7210   schema_dist_data.release();
7211 
7212   if (binlog_thread_state == BCCC_restart)
7213   {
7214     native_mutex_lock(&injector_mutex);
7215     goto restart_cluster_failure;
7216   }
7217 
7218   // Release the thd->net created without vio
7219   thd->get_protocol_classic()->end_net();
7220   thd->release_resources();
7221   thd_manager->remove_thd(thd);
7222   delete thd;
7223 
7224   ndb_binlog_running= FALSE;
7225   (void) native_cond_signal(&injector_cond);
7226 
7227   log_info("Stopped");
7228 
7229   DBUG_PRINT("exit", ("ndb_binlog_thread"));
7230   DBUG_VOID_RETURN;
7231 }
7232 
7233 
7234 /*
7235   Return string containing current status of ndb binlog as
7236   comma separated name value pairs.
7237 
7238   Used by ndbcluster_show_status() to fill the "binlog" row
7239   in result of SHOW ENGINE NDB STATUS
7240 
7241   @param     buf     The buffer where to print status string
7242   @param     bufzies Size of the buffer
7243 
7244   @return    Length of the string printed to "buf" or 0 if no string
7245              is printed
7246 */
7247 
7248 size_t
ndbcluster_show_status_binlog(char * buf,size_t buf_size)7249 ndbcluster_show_status_binlog(char *buf, size_t buf_size)
7250 {
7251   DBUG_ENTER("ndbcluster_show_status_binlog");
7252 
7253   native_mutex_lock(&injector_mutex);
7254   if (injector_ndb)
7255   {
7256     const ulonglong latest_epoch= injector_ndb->getLatestGCI();
7257     native_mutex_unlock(&injector_mutex);
7258 
7259     // Get highest trans gci seen by the cluster connections
7260     const ulonglong latest_trans_epoch = ndb_get_latest_trans_gci();
7261 
7262     const size_t buf_len =
7263       my_snprintf(buf, buf_size,
7264                   "latest_epoch=%llu, "
7265                   "latest_trans_epoch=%llu, "
7266                   "latest_received_binlog_epoch=%llu, "
7267                   "latest_handled_binlog_epoch=%llu, "
7268                   "latest_applied_binlog_epoch=%llu",
7269                   latest_epoch,
7270                   latest_trans_epoch,
7271                   ndb_latest_received_binlog_epoch,
7272                   ndb_latest_handled_binlog_epoch,
7273                   ndb_latest_applied_binlog_epoch);
7274       DBUG_RETURN(buf_len);
7275   }
7276   else
7277     native_mutex_unlock(&injector_mutex);
7278   DBUG_RETURN(0);
7279 }
7280 
7281 
7282 #ifdef NDB_WITHOUT_SERVER_ID_BITS
7283 
7284 /* No --server-id-bits=<bits> -> implement constant opt_server_id_mask */
7285 ulong opt_server_id_mask = ~0;
7286 
7287 #endif
7288