1 /*
2   Copyright (c) 2006, 2015, Oracle and/or its affiliates. All rights reserved.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License as published by
6   the Free Software Foundation; version 2 of the License.
7 
8   This program is distributed in the hope that it will be useful,
9   but WITHOUT ANY WARRANTY; without even the implied warranty of
10   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11   GNU General Public License for more details.
12 
13   You should have received a copy of the GNU General Public License
14   along with this program; if not, write to the Free Software
15   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
16 */
17 
18 #include "sql_priv.h"
19 #include "unireg.h"         // REQUIRED: for other includes
20 #include "sql_show.h"
21 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
22 #include "ha_ndbcluster.h"
23 
24 #ifdef HAVE_NDB_BINLOG
25 #include "rpl_injector.h"
26 #include "rpl_filter.h"
27 #include "slave.h"
28 #include "log_event.h"
29 #include "ha_ndbcluster_binlog.h"
30 #include "NdbDictionary.hpp"
31 #include "ndb_cluster_connection.hpp"
32 #include <util/NdbAutoPtr.hpp>
33 
34 #include "sql_base.h"                           // close_thread_tables
35 #include "sql_table.h"                         // build_table_filename
36 #include "table.h"                             // open_table_from_share
37 #include "discover.h"                          // readfrm, writefrm
38 #include "lock.h"                              // MYSQL_LOCK_IGNORE_FLUSH,
39                                                // mysql_unlock_tables
40 #include "sql_parse.h"                         // mysql_parse
41 #include "transaction.h"
42 
43 #ifdef ndb_dynamite
44 #undef assert
45 #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
46 #endif
47 
48 extern my_bool opt_ndb_log_binlog_index;
49 extern ulong opt_ndb_extra_logging;
50 /*
51   defines for cluster replication table names
52 */
53 #include "ha_ndbcluster_tables.h"
54 #define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
55 #define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
56 
57 /*
58   Timeout for syncing schema events between
59   mysql servers, and between mysql server and the binlog
60 */
61 static const int DEFAULT_SYNC_TIMEOUT= 120;
62 
63 
64 /*
65   Flag showing if the ndb injector thread is running, if so == 1
66   -1 if it was started but later stopped for some reason
67    0 if never started
68 */
69 static int ndb_binlog_thread_running= 0;
70 
71 /*
72   Flag showing if the ndb binlog should be created, if so == TRUE
73   FALSE if not
74 */
75 my_bool ndb_binlog_running= FALSE;
76 my_bool ndb_binlog_tables_inited= FALSE;
77 
78 /*
79   Global reference to the ndb injector thread THD oject
80 
81   Has one sole purpose, for setting the in_use table member variable
82   in get_share(...)
83 */
84 THD *injector_thd= 0;
85 
86 /*
87   Global reference to ndb injector thd object.
88 
89   Used mainly by the binlog index thread, but exposed to the client sql
90   thread for one reason; to setup the events operations for a table
91   to enable ndb injector thread receiving events.
92 
93   Must therefore always be used with a surrounding
94   mysql_mutex_lock(&injector_mutex), when doing create/dropEventOperation
95 */
96 static Ndb *injector_ndb= 0;
97 static Ndb *schema_ndb= 0;
98 
99 static int ndbcluster_binlog_inited= 0;
100 /*
101   Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
102   Server main loop should call handlerton function:
103 
104   ndbcluster_hton->binlog_func ==
105   ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
106   ndbcluster_binlog_end
107 
108   at shutdown, which sets the flag. And then server needs to wait for it
109   to complete.  Otherwise binlog will not be complete.
110 
111   ndbcluster_hton->panic == ndbcluster_end() will not return until
112   ndb binlog is completed
113 */
114 static int ndbcluster_binlog_terminating= 0;
115 
116 /*
117   Mutex and condition used for interacting between client sql thread
118   and injector thread
119 */
120 pthread_t ndb_binlog_thread;
121 mysql_mutex_t injector_mutex;
122 mysql_cond_t  injector_cond;
123 
124 /* NDB Injector thread (used for binlog creation) */
125 static ulonglong ndb_latest_applied_binlog_epoch= 0;
126 static ulonglong ndb_latest_handled_binlog_epoch= 0;
127 static ulonglong ndb_latest_received_binlog_epoch= 0;
128 
129 NDB_SHARE *ndb_apply_status_share= 0;
130 NDB_SHARE *ndb_schema_share= 0;
131 mysql_mutex_t ndb_schema_share_mutex;
132 
133 extern my_bool opt_log_slave_updates;
134 static my_bool g_ndb_log_slave_updates;
135 
136 /* Schema object distribution handling */
137 HASH ndb_schema_objects;
138 typedef struct st_ndb_schema_object {
139   mysql_mutex_t mutex;
140   char *key;
141   uint key_length;
142   uint use_count;
143   MY_BITMAP slock_bitmap;
144   uint32 slock[256/32]; // 256 bits for lock status of table
145 } NDB_SCHEMA_OBJECT;
146 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
147                                                 my_bool create_if_not_exists,
148                                                 my_bool have_lock);
149 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
150                                    bool have_lock);
151 
152 static Uint64 *p_latest_trans_gci= 0;
153 
154 /*
155   Global variables for holding the ndb_binlog_index table reference
156 */
157 static TABLE *ndb_binlog_index= 0;
158 static TABLE_LIST binlog_tables;
159 
160 /*
161   Helper functions
162 */
163 
164 #ifndef DBUG_OFF
165 /* purecov: begin deadcode */
print_records(TABLE * table,const uchar * record)166 static void print_records(TABLE *table, const uchar *record)
167 {
168   for (uint j= 0; j < table->s->fields; j++)
169   {
170     char buf[40];
171     int pos= 0;
172     Field *field= table->field[j];
173     const uchar* field_ptr= field->ptr - table->record[0] + record;
174     int pack_len= field->pack_length();
175     int n= pack_len < 10 ? pack_len : 10;
176 
177     for (int i= 0; i < n && pos < 20; i++)
178     {
179       pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
180     }
181     buf[pos]= 0;
182     DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
183   }
184 }
185 /* purecov: end */
186 #else
187 #define print_records(a,b)
188 #endif
189 
190 
191 #ifndef DBUG_OFF
dbug_print_table(const char * info,TABLE * table)192 static void dbug_print_table(const char *info, TABLE *table)
193 {
194   if (table == 0)
195   {
196     DBUG_PRINT("info",("%s: (null)", info));
197     return;
198   }
199   DBUG_PRINT("info",
200              ("%s: %s.%s s->fields: %d  "
201               "reclength: %lu  rec_buff_length: %u  record[0]: 0x%lx  "
202               "record[1]: 0x%lx",
203               info,
204               table->s->db.str,
205               table->s->table_name.str,
206               table->s->fields,
207               table->s->reclength,
208               table->s->rec_buff_length,
209               (long) table->record[0],
210               (long) table->record[1]));
211 
212   for (unsigned int i= 0; i < table->s->fields; i++)
213   {
214     Field *f= table->field[i];
215     DBUG_PRINT("info",
216                ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d  pack_length: %d  "
217                 "ptr: 0x%lx[+%d]  null_bit: %u  null_ptr: 0x%lx[+%d]",
218                 i,
219                 f->field_name,
220                 (long) f->flags,
221                 (f->flags & PRI_KEY_FLAG)  ? "pri"       : "attr",
222                 (f->flags & NOT_NULL_FLAG) ? ""          : ",nullable",
223                 (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
224                 (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
225                 (f->flags & BLOB_FLAG)     ? ",blob"     : "",
226                 (f->flags & BINARY_FLAG)   ? ",binary"   : "",
227                 f->real_type(),
228                 f->pack_length(),
229                 (long) f->ptr, (int) (f->ptr - table->record[0]),
230                 f->null_bit,
231                 (long) f->null_ptr,
232                 (int) ((uchar*) f->null_ptr - table->record[0])));
233     if (f->type() == MYSQL_TYPE_BIT)
234     {
235       Field_bit *g= (Field_bit*) f;
236       DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d  bit_ptr: 0x%lx[+%d] "
237                                    "bit_ofs: %d  bit_len: %u",
238                                    g->field_length, (long) g->bit_ptr,
239                                    (int) ((uchar*) g->bit_ptr -
240                                           table->record[0]),
241                                    g->bit_ofs, g->bit_len));
242     }
243   }
244 }
245 #else
246 #define dbug_print_table(a,b)
247 #endif
248 
249 
250 /*
251   Run a query through mysql_parse
252 
253   Used to:
254   - purging the ndb_binlog_index
255   - creating the ndb_apply_status table
256 */
run_query(THD * thd,char * buf,char * end,const int * no_print_error,my_bool disable_binlog)257 static void run_query(THD *thd, char *buf, char *end,
258                       const int *no_print_error, my_bool disable_binlog)
259 {
260   ulong save_thd_query_length= thd->query_length();
261   char *save_thd_query= thd->query();
262   ulong save_thread_id= thd->variables.pseudo_thread_id;
263   struct system_status_var save_thd_status_var= thd->status_var;
264   THD_TRANS save_thd_transaction_all= thd->transaction.all;
265   THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
266   ulonglong save_thd_options= thd->variables.option_bits;
267   DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->variables.option_bits));
268   NET save_thd_net= thd->net;
269 
270   bzero((char*) &thd->net, sizeof(NET));
271   thd->set_query(buf, (uint) (end - buf));
272   thd->variables.pseudo_thread_id= thread_id;
273   thd->transaction.stmt.modified_non_trans_table= FALSE;
274   if (disable_binlog)
275     thd->variables.option_bits&= ~OPTION_BIN_LOG;
276 
277   DBUG_PRINT("query", ("%s", thd->query()));
278 
279   DBUG_ASSERT(!thd->in_sub_stmt);
280   DBUG_ASSERT(!thd->locked_tables_mode);
281 
282   {
283     Parser_state parser_state;
284     if (!parser_state.init(thd, thd->query(), thd->query_length()))
285       mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
286   }
287 
288   if (no_print_error && thd->is_slave_error)
289   {
290     int i;
291     Thd_ndb *thd_ndb= get_thd_ndb(thd);
292     for (i= 0; no_print_error[i]; i++)
293       if ((thd_ndb->m_error_code == no_print_error[i]) ||
294           (thd->stmt_da->sql_errno() == (unsigned) no_print_error[i]))
295         break;
296     if (!no_print_error[i])
297       sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
298                       buf,
299                       thd->stmt_da->message(),
300                       thd->stmt_da->sql_errno(),
301                       thd_ndb->m_error_code,
302                       (int) thd->is_error(), thd->is_slave_error);
303   }
304   /*
305     XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
306     can not be called from within a statement, and
307     run_query() can be called from anywhere, including from within
308     a sub-statement.
309     This particular reset is a temporary hack to avoid an assert
310     for double assignment of the diagnostics area when run_query()
311     is called from ndbcluster_reset_logs(), which is called from
312     mysql_flush().
313   */
314   thd->stmt_da->reset_diagnostics_area();
315 
316   thd->variables.option_bits= save_thd_options;
317   thd->set_query(save_thd_query, save_thd_query_length);
318   thd->variables.pseudo_thread_id= save_thread_id;
319   thd->status_var= save_thd_status_var;
320   thd->transaction.all= save_thd_transaction_all;
321   thd->transaction.stmt= save_thd_transaction_stmt;
322   thd->net= save_thd_net;
323   thd->set_current_stmt_binlog_format_row();
324 
325   if (thd == injector_thd)
326   {
327     /*
328       running the query will close all tables, including the ndb_binlog_index
329       used in injector_thd
330     */
331     ndb_binlog_index= 0;
332   }
333 }
334 
335 static void
ndbcluster_binlog_close_table(THD * thd,NDB_SHARE * share)336 ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
337 {
338   DBUG_ENTER("ndbcluster_binlog_close_table");
339   if (share->table_share)
340   {
341     closefrm(share->table, 1);
342     share->table_share= 0;
343     share->table= 0;
344   }
345   DBUG_ASSERT(share->table == 0);
346   DBUG_VOID_RETURN;
347 }
348 
349 
350 /*
351   Creates a TABLE object for the ndb cluster table
352 
353   NOTES
354     This does not open the underlying table
355 */
356 
357 static int
ndbcluster_binlog_open_table(THD * thd,NDB_SHARE * share,TABLE_SHARE * table_share,TABLE * table,int reopen)358 ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
359                              TABLE_SHARE *table_share, TABLE *table,
360                              int reopen)
361 {
362   int error;
363   DBUG_ENTER("ndbcluster_binlog_open_table");
364 
365   init_tmp_table_share(thd, table_share, share->db, 0, share->table_name,
366                        share->key);
367   if ((error= open_table_def(thd, table_share, 0)))
368   {
369     DBUG_PRINT("error", ("open_table_def failed: %d my_errno: %d", error, my_errno));
370     free_table_share(table_share);
371     DBUG_RETURN(error);
372   }
373   if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
374                                     (uint) READ_ALL, 0, table, FALSE)))
375   {
376     DBUG_PRINT("error", ("open_table_from_share failed %d my_errno: %d", error, my_errno));
377     free_table_share(table_share);
378     DBUG_RETURN(error);
379   }
380   mysql_mutex_lock(&LOCK_open);
381   assign_new_table_id(table_share);
382   mysql_mutex_unlock(&LOCK_open);
383 
384   if (!reopen)
385   {
386     // allocate memory on ndb share so it can be reused after online alter table
387     (void)multi_alloc_root(&share->mem_root,
388                            &(share->record[0]), table->s->rec_buff_length,
389                            &(share->record[1]), table->s->rec_buff_length,
390                            NULL);
391   }
392   {
393     my_ptrdiff_t row_offset= share->record[0] - table->record[0];
394     Field **p_field;
395     for (p_field= table->field; *p_field; p_field++)
396       (*p_field)->move_field_offset(row_offset);
397     table->record[0]= share->record[0];
398     table->record[1]= share->record[1];
399   }
400 
401   table->in_use= injector_thd;
402 
403   table->s->db.str= share->db;
404   table->s->db.length= strlen(share->db);
405   table->s->table_name.str= share->table_name;
406   table->s->table_name.length= strlen(share->table_name);
407 
408   DBUG_ASSERT(share->table_share == 0);
409   share->table_share= table_share;
410   DBUG_ASSERT(share->table == 0);
411   share->table= table;
412   /* We can't use 'use_all_columns()' as the file object is not setup yet */
413   table->column_bitmaps_set_no_signal(&table->s->all_set, &table->s->all_set);
414 #ifndef DBUG_OFF
415   dbug_print_table("table", table);
416 #endif
417   DBUG_RETURN(0);
418 }
419 
420 
421 /*
422   Initialize the binlog part of the NDB_SHARE
423 */
ndbcluster_binlog_init_share(NDB_SHARE * share,TABLE * _table)424 int ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
425 {
426   THD *thd= current_thd;
427   MEM_ROOT *mem_root= &share->mem_root;
428   int do_event_op= ndb_binlog_running;
429   int error= 0;
430   DBUG_ENTER("ndbcluster_binlog_init_share");
431 
432   share->connect_count= g_ndb_cluster_connection->get_connect_count();
433 
434   share->op= 0;
435   share->table= 0;
436 
437   if (!ndb_schema_share &&
438       strcmp(share->db, NDB_REP_DB) == 0 &&
439       strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
440     do_event_op= 1;
441   else if (!ndb_apply_status_share &&
442            strcmp(share->db, NDB_REP_DB) == 0 &&
443            strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
444     do_event_op= 1;
445 
446   {
447     int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
448     share->subscriber_bitmap= (MY_BITMAP*)
449       alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
450     for (i= 0; i < no_nodes; i++)
451     {
452       bitmap_init(&share->subscriber_bitmap[i],
453                   (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
454                   max_ndb_nodes, FALSE);
455       bitmap_clear_all(&share->subscriber_bitmap[i]);
456     }
457   }
458 
459   if (!do_event_op)
460   {
461     if (_table)
462     {
463       if (_table->s->primary_key == MAX_KEY)
464         share->flags|= NSF_HIDDEN_PK;
465       if (_table->s->blob_fields != 0)
466         share->flags|= NSF_BLOB_FLAG;
467     }
468     else
469     {
470       share->flags|= NSF_NO_BINLOG;
471     }
472     DBUG_RETURN(error);
473   }
474   while (1)
475   {
476     int error;
477     TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
478     TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
479     if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
480       break;
481     /*
482       ! do not touch the contents of the table
483       it may be in use by the injector thread
484     */
485     MEM_ROOT *mem_root= &share->mem_root;
486     share->ndb_value[0]= (NdbValue*)
487       alloc_root(mem_root, sizeof(NdbValue) *
488                  (table->s->fields + 2 /*extra for hidden key and part key*/));
489     share->ndb_value[1]= (NdbValue*)
490       alloc_root(mem_root, sizeof(NdbValue) *
491                  (table->s->fields + 2 /*extra for hidden key and part key*/));
492 
493     if (table->s->primary_key == MAX_KEY)
494       share->flags|= NSF_HIDDEN_PK;
495     if (table->s->blob_fields != 0)
496       share->flags|= NSF_BLOB_FLAG;
497     break;
498   }
499   DBUG_RETURN(error);
500 }
501 
502 /*****************************************************************
503   functions called from master sql client threads
504 ****************************************************************/
505 
506 /*
507   called in mysql_show_binlog_events and reset_logs to make sure we wait for
508   all events originating from this mysql server to arrive in the binlog
509 
510   Wait for the last epoch in which the last transaction is a part of.
511 
512   Wait a maximum of 30 seconds.
513 */
ndbcluster_binlog_wait(THD * thd)514 static void ndbcluster_binlog_wait(THD *thd)
515 {
516   if (ndb_binlog_running)
517   {
518     DBUG_ENTER("ndbcluster_binlog_wait");
519     const char *save_info= thd ? thd->proc_info : 0;
520     ulonglong wait_epoch= *p_latest_trans_gci;
521     int count= 30;
522     if (thd)
523       thd->proc_info= "Waiting for ndbcluster binlog update to "
524 	"reach current position";
525     while (count && ndb_binlog_running &&
526            ndb_latest_handled_binlog_epoch < wait_epoch)
527     {
528       count--;
529       sleep(1);
530     }
531     if (thd)
532       thd->proc_info= save_info;
533     DBUG_VOID_RETURN;
534   }
535 }
536 
537 /*
538  Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
539 */
ndbcluster_reset_logs(THD * thd)540 static int ndbcluster_reset_logs(THD *thd)
541 {
542   if (!ndb_binlog_running)
543     return 0;
544 
545   DBUG_ENTER("ndbcluster_reset_logs");
546 
547   /*
548     Wait for all events orifinating from this mysql server has
549     reached the binlog before continuing to reset
550   */
551   ndbcluster_binlog_wait(thd);
552 
553   char buf[1024];
554   char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
555 
556   run_query(thd, buf, end, NULL, TRUE);
557 
558   DBUG_RETURN(0);
559 }
560 
561 /*
562   Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
563   is removed
564 */
565 
566 static int
ndbcluster_binlog_index_purge_file(THD * thd,const char * file)567 ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
568 {
569   if (!ndb_binlog_running || thd->slave_thread)
570     return 0;
571 
572   DBUG_ENTER("ndbcluster_binlog_index_purge_file");
573   DBUG_PRINT("enter", ("file: %s", file));
574 
575   char buf[1024];
576   char *end= strmov(strmov(strmov(buf,
577                                   "DELETE FROM "
578                                   NDB_REP_DB "." NDB_REP_TABLE
579                                   " WHERE File='"), file), "'");
580 
581   run_query(thd, buf, end, NULL, TRUE);
582 
583   DBUG_RETURN(0);
584 }
585 
586 static void
ndbcluster_binlog_log_query(handlerton * hton,THD * thd,enum_binlog_command binlog_command,const char * query,uint query_length,const char * db,const char * table_name)587 ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binlog_command,
588                             const char *query, uint query_length,
589                             const char *db, const char *table_name)
590 {
591   DBUG_ENTER("ndbcluster_binlog_log_query");
592   DBUG_PRINT("enter", ("db: %s  table_name: %s  query: %s",
593                        db, table_name, query));
594   enum SCHEMA_OP_TYPE type;
595   int log= 0;
596   switch (binlog_command)
597   {
598   case LOGCOM_CREATE_TABLE:
599     type= SOT_CREATE_TABLE;
600     DBUG_ASSERT(FALSE);
601     break;
602   case LOGCOM_ALTER_TABLE:
603     type= SOT_ALTER_TABLE;
604     log= 1;
605     break;
606   case LOGCOM_RENAME_TABLE:
607     type= SOT_RENAME_TABLE;
608     DBUG_ASSERT(FALSE);
609     break;
610   case LOGCOM_DROP_TABLE:
611     type= SOT_DROP_TABLE;
612     DBUG_ASSERT(FALSE);
613     break;
614   case LOGCOM_CREATE_DB:
615     type= SOT_CREATE_DB;
616     log= 1;
617     break;
618   case LOGCOM_ALTER_DB:
619     type= SOT_ALTER_DB;
620     log= 1;
621     break;
622   case LOGCOM_DROP_DB:
623     type= SOT_DROP_DB;
624     DBUG_ASSERT(FALSE);
625     break;
626   }
627   if (log)
628   {
629     ndbcluster_log_schema_op(thd, 0, query, query_length,
630                              db, table_name, 0, 0, type,
631                              0, 0);
632   }
633   DBUG_VOID_RETURN;
634 }
635 
636 
637 /*
638   End use of the NDB Cluster binlog
639    - wait for binlog thread to shutdown
640 */
641 
ndbcluster_binlog_end(THD * thd)642 static int ndbcluster_binlog_end(THD *thd)
643 {
644   DBUG_ENTER("ndbcluster_binlog_end");
645 
646   if (!ndbcluster_binlog_inited)
647     DBUG_RETURN(0);
648   ndbcluster_binlog_inited= 0;
649 
650 #ifdef HAVE_NDB_BINLOG
651   if (ndb_util_thread_running > 0)
652   {
653     /*
654       Wait for util thread to die (as this uses the injector mutex)
655       There is a very small change that ndb_util_thread dies and the
656       following mutex is freed before it's accessed. This shouldn't
657       however be a likely case as the ndbcluster_binlog_end is supposed to
658       be called before ndb_cluster_end().
659     */
660     mysql_mutex_lock(&LOCK_ndb_util_thread);
661     /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
662     ndb_util_thread_running++;
663     ndbcluster_terminating= 1;
664     mysql_cond_signal(&COND_ndb_util_thread);
665     while (ndb_util_thread_running > 1)
666       mysql_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
667     ndb_util_thread_running--;
668     mysql_mutex_unlock(&LOCK_ndb_util_thread);
669   }
670 
671   /* wait for injector thread to finish */
672   ndbcluster_binlog_terminating= 1;
673   mysql_mutex_lock(&injector_mutex);
674   mysql_cond_signal(&injector_cond);
675   while (ndb_binlog_thread_running > 0)
676     mysql_cond_wait(&injector_cond, &injector_mutex);
677   mysql_mutex_unlock(&injector_mutex);
678 
679   mysql_mutex_destroy(&injector_mutex);
680   mysql_cond_destroy(&injector_cond);
681   mysql_mutex_destroy(&ndb_schema_share_mutex);
682 #endif
683 
684   DBUG_RETURN(0);
685 }
686 
687 /*****************************************************************
688   functions called from slave sql client threads
689 ****************************************************************/
ndbcluster_reset_slave(THD * thd)690 static void ndbcluster_reset_slave(THD *thd)
691 {
692   if (!ndb_binlog_running)
693     return;
694 
695   DBUG_ENTER("ndbcluster_reset_slave");
696   char buf[1024];
697   char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
698   run_query(thd, buf, end, NULL, TRUE);
699   DBUG_VOID_RETURN;
700 }
701 
702 /*
703   Initialize the binlog part of the ndb handlerton
704 */
705 
706 /**
707   Upon the sql command flush logs, we need to ensure that all outstanding
708   ndb data to be logged has made it to the binary log to get a deterministic
709   behavior on the rotation of the log.
710  */
ndbcluster_flush_logs(handlerton * hton)711 static bool ndbcluster_flush_logs(handlerton *hton)
712 {
713   ndbcluster_binlog_wait(current_thd);
714   return FALSE;
715 }
716 
ndbcluster_binlog_func(handlerton * hton,THD * thd,enum_binlog_func fn,void * arg)717 static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
718                                   enum_binlog_func fn,
719                                   void *arg)
720 {
721   switch(fn)
722   {
723   case BFN_RESET_LOGS:
724     ndbcluster_reset_logs(thd);
725     break;
726   case BFN_RESET_SLAVE:
727     ndbcluster_reset_slave(thd);
728     break;
729   case BFN_BINLOG_WAIT:
730     ndbcluster_binlog_wait(thd);
731     break;
732   case BFN_BINLOG_END:
733     ndbcluster_binlog_end(thd);
734     break;
735   case BFN_BINLOG_PURGE_FILE:
736     ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
737     break;
738   }
739   return 0;
740 }
741 
ndbcluster_binlog_init_handlerton()742 void ndbcluster_binlog_init_handlerton()
743 {
744   handlerton *h= ndbcluster_hton;
745   h->flush_logs=       ndbcluster_flush_logs;
746   h->binlog_func=      ndbcluster_binlog_func;
747   h->binlog_log_query= ndbcluster_binlog_log_query;
748 }
749 
750 
751 
752 
753 
754 /*
755   check the availability af the ndb_apply_status share
756   - return share, but do not increase refcount
757   - return 0 if there is no share
758 */
ndbcluster_check_ndb_apply_status_share()759 static NDB_SHARE *ndbcluster_check_ndb_apply_status_share()
760 {
761   mysql_mutex_lock(&ndbcluster_mutex);
762 
763   void *share= my_hash_search(&ndbcluster_open_tables,
764                               (uchar*) NDB_APPLY_TABLE_FILE,
765                               sizeof(NDB_APPLY_TABLE_FILE) - 1);
766   DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx",
767                      NDB_APPLY_TABLE_FILE, (long) share));
768   mysql_mutex_unlock(&ndbcluster_mutex);
769   return (NDB_SHARE*) share;
770 }
771 
772 /*
773   check the availability af the schema share
774   - return share, but do not increase refcount
775   - return 0 if there is no share
776 */
ndbcluster_check_ndb_schema_share()777 static NDB_SHARE *ndbcluster_check_ndb_schema_share()
778 {
779   mysql_mutex_lock(&ndbcluster_mutex);
780 
781   void *share= my_hash_search(&ndbcluster_open_tables,
782                               (uchar*) NDB_SCHEMA_TABLE_FILE,
783                               sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
784   DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx",
785                      NDB_SCHEMA_TABLE_FILE, (long) share));
786   mysql_mutex_unlock(&ndbcluster_mutex);
787   return (NDB_SHARE*) share;
788 }
789 
790 /*
791   Create the ndb_apply_status table
792 */
ndbcluster_create_ndb_apply_status_table(THD * thd)793 static int ndbcluster_create_ndb_apply_status_table(THD *thd)
794 {
795   DBUG_ENTER("ndbcluster_create_ndb_apply_status_table");
796 
797   /*
798     Check if we already have the apply status table.
799     If so it should have been discovered at startup
800     and thus have a share
801   */
802 
803   if (ndbcluster_check_ndb_apply_status_share())
804     DBUG_RETURN(0);
805 
806   if (g_ndb_cluster_connection->get_no_ready() <= 0)
807     DBUG_RETURN(0);
808 
809   char buf[1024 + 1], *end;
810 
811   if (opt_ndb_extra_logging)
812     sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
813 
814   /*
815     Check if apply status table exists in MySQL "dictionary"
816     if so, remove it since there is none in Ndb
817   */
818   {
819     build_table_filename(buf, sizeof(buf) - 1,
820                          NDB_REP_DB, NDB_APPLY_TABLE, reg_ext, 0);
821     mysql_file_delete(key_file_frm, buf, MYF(0));
822   }
823 
824   /*
825     Note, updating this table schema must be reflected in ndb_restore
826   */
827   end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
828                    NDB_REP_DB "." NDB_APPLY_TABLE
829                    " ( server_id INT UNSIGNED NOT NULL,"
830                    " epoch BIGINT UNSIGNED NOT NULL, "
831                    " log_name VARCHAR(255) BINARY NOT NULL, "
832                    " start_pos BIGINT UNSIGNED NOT NULL, "
833                    " end_pos BIGINT UNSIGNED NOT NULL, "
834                    " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB CHARACTER SET latin1");
835 
836   const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
837                                 701,
838                                 702,
839                                 721, // Table already exist
840                                 4009,
841                                 0}; // do not print error 701 etc
842   run_query(thd, buf, end, no_print_error, TRUE);
843 
844   DBUG_RETURN(0);
845 }
846 
847 
848 /*
849   Create the schema table
850 */
ndbcluster_create_schema_table(THD * thd)851 static int ndbcluster_create_schema_table(THD *thd)
852 {
853   DBUG_ENTER("ndbcluster_create_schema_table");
854 
855   /*
856     Check if we already have the schema table.
857     If so it should have been discovered at startup
858     and thus have a share
859   */
860 
861   if (ndbcluster_check_ndb_schema_share())
862     DBUG_RETURN(0);
863 
864   if (g_ndb_cluster_connection->get_no_ready() <= 0)
865     DBUG_RETURN(0);
866 
867   char buf[1024 + 1], *end;
868 
869   if (opt_ndb_extra_logging)
870     sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);
871 
872   /*
873     Check if schema table exists in MySQL "dictionary"
874     if so, remove it since there is none in Ndb
875   */
876   {
877     build_table_filename(buf, sizeof(buf) - 1,
878                          NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext, 0);
879     mysql_file_delete(key_file_frm, buf, MYF(0));
880   }
881 
882   /*
883     Update the defines below to reflect the table schema
884   */
885   end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
886                    NDB_REP_DB "." NDB_SCHEMA_TABLE
887                    " ( db VARBINARY(63) NOT NULL,"
888                    " name VARBINARY(63) NOT NULL,"
889                    " slock BINARY(32) NOT NULL,"
890                    " query BLOB NOT NULL,"
891                    " node_id INT UNSIGNED NOT NULL,"
892                    " epoch BIGINT UNSIGNED NOT NULL,"
893                    " id INT UNSIGNED NOT NULL,"
894                    " version INT UNSIGNED NOT NULL,"
895                    " type INT UNSIGNED NOT NULL,"
896                    " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB CHARACTER SET latin1");
897 
898   const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
899                                 701,
900                                 702,
901                                 721, // Table already exist
902                                 4009,
903                                 0}; // do not print error 701 etc
904   run_query(thd, buf, end, no_print_error, TRUE);
905 
906   DBUG_RETURN(0);
907 }
908 
ndbcluster_setup_binlog_table_shares(THD * thd)909 int ndbcluster_setup_binlog_table_shares(THD *thd)
910 {
911   if (!ndb_schema_share &&
912       ndbcluster_check_ndb_schema_share() == 0)
913   {
914     ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
915     if (!ndb_schema_share)
916     {
917       ndbcluster_create_schema_table(thd);
918       // always make sure we create the 'schema' first
919       if (!ndb_schema_share)
920         return 1;
921     }
922   }
923   if (!ndb_apply_status_share &&
924       ndbcluster_check_ndb_apply_status_share() == 0)
925   {
926     ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
927     if (!ndb_apply_status_share)
928     {
929       ndbcluster_create_ndb_apply_status_table(thd);
930       if (!ndb_apply_status_share)
931         return 1;
932     }
933   }
934   if (!ndbcluster_find_all_files(thd))
935   {
936     ndb_binlog_tables_inited= TRUE;
937     if (opt_ndb_extra_logging)
938       sql_print_information("NDB Binlog: ndb tables writable");
939     close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
940     /* Signal injector thread that all is setup */
941     mysql_cond_signal(&injector_cond);
942   }
943   return 0;
944 }
945 
946 /*
947   Defines and struct for schema table.
948   Should reflect table definition above.
949 */
950 #define SCHEMA_DB_I 0u
951 #define SCHEMA_NAME_I 1u
952 #define SCHEMA_SLOCK_I 2u
953 #define SCHEMA_QUERY_I 3u
954 #define SCHEMA_NODE_ID_I 4u
955 #define SCHEMA_EPOCH_I 5u
956 #define SCHEMA_ID_I 6u
957 #define SCHEMA_VERSION_I 7u
958 #define SCHEMA_TYPE_I 8u
959 #define SCHEMA_SIZE 9u
960 #define SCHEMA_SLOCK_SIZE 32u
961 
962 struct Cluster_schema
963 {
964   uchar db_length;
965   char db[64];
966   uchar name_length;
967   char name[64];
968   uchar slock_length;
969   uint32 slock[SCHEMA_SLOCK_SIZE/4];
970   unsigned short query_length;
971   char *query;
972   Uint64 epoch;
973   uint32 node_id;
974   uint32 id;
975   uint32 version;
976   uint32 type;
977   uint32 any_value;
978 };
979 
print_could_not_discover_error(THD * thd,const Cluster_schema * schema)980 static void print_could_not_discover_error(THD *thd,
981                                            const Cluster_schema *schema)
982 {
983   sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
984                   "binlog schema event '%s' from node %d. "
985                   "my_errno: %d",
986                    schema->db, schema->name, schema->query,
987                    schema->node_id, my_errno);
988   List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
989   MYSQL_ERROR *err;
990   while ((err= it++))
991     sql_print_warning("NDB Binlog: (%d)%s", err->get_sql_errno(),
992                       err->get_message_text());
993 }
994 
995 /*
996   Transfer schema table data into corresponding struct
997 */
ndbcluster_get_schema(NDB_SHARE * share,Cluster_schema * s)998 static void ndbcluster_get_schema(NDB_SHARE *share,
999                                   Cluster_schema *s)
1000 {
1001   TABLE *table= share->table;
1002   Field **field;
1003   /* unpack blob values */
1004   uchar* blobs_buffer= 0;
1005   uint blobs_buffer_size= 0;
1006   my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1007   {
1008     ptrdiff_t ptrdiff= 0;
1009     int ret= get_ndb_blobs_value(table, share->ndb_value[0],
1010                                  blobs_buffer, blobs_buffer_size,
1011                                  ptrdiff);
1012     if (ret != 0)
1013     {
1014       my_free(blobs_buffer);
1015       DBUG_PRINT("info", ("blob read error"));
1016       DBUG_ASSERT(FALSE);
1017     }
1018   }
1019   /* db varchar 1 length uchar */
1020   field= table->field;
1021   s->db_length= *(uint8*)(*field)->ptr;
1022   DBUG_ASSERT(s->db_length <= (*field)->field_length);
1023   DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
1024   memcpy(s->db, (*field)->ptr + 1, s->db_length);
1025   s->db[s->db_length]= 0;
1026   /* name varchar 1 length uchar */
1027   field++;
1028   s->name_length= *(uint8*)(*field)->ptr;
1029   DBUG_ASSERT(s->name_length <= (*field)->field_length);
1030   DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
1031   memcpy(s->name, (*field)->ptr + 1, s->name_length);
1032   s->name[s->name_length]= 0;
1033   /* slock fixed length */
1034   field++;
1035   s->slock_length= (*field)->field_length;
1036   DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
1037   memcpy(s->slock, (*field)->ptr, s->slock_length);
1038   /* query blob */
1039   field++;
1040   {
1041     Field_blob *field_blob= (Field_blob*)(*field);
1042     uint blob_len= field_blob->get_length((*field)->ptr);
1043     uchar *blob_ptr= 0;
1044     field_blob->get_ptr(&blob_ptr);
1045     DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
1046     s->query_length= blob_len;
1047     s->query= sql_strmake((char*) blob_ptr, blob_len);
1048   }
1049   /* node_id */
1050   field++;
1051   s->node_id= ((Field_long *)*field)->val_int();
1052   /* epoch */
1053   field++;
1054   s->epoch= ((Field_long *)*field)->val_int();
1055   /* id */
1056   field++;
1057   s->id= ((Field_long *)*field)->val_int();
1058   /* version */
1059   field++;
1060   s->version= ((Field_long *)*field)->val_int();
1061   /* type */
1062   field++;
1063   s->type= ((Field_long *)*field)->val_int();
1064   /* free blobs buffer */
1065   my_free(blobs_buffer);
1066   dbug_tmp_restore_column_map(table->read_set, old_map);
1067 }
1068 
1069 /*
1070   helper function to pack a ndb varchar
1071 */
ndb_pack_varchar(const NDBCOL * col,char * buf,const char * str,int sz)1072 char *ndb_pack_varchar(const NDBCOL *col, char *buf,
1073                        const char *str, int sz)
1074 {
1075   switch (col->getArrayType())
1076   {
1077     case NDBCOL::ArrayTypeFixed:
1078       memcpy(buf, str, sz);
1079       break;
1080     case NDBCOL::ArrayTypeShortVar:
1081       *(uchar*)buf= (uchar)sz;
1082       memcpy(buf + 1, str, sz);
1083       break;
1084     case NDBCOL::ArrayTypeMediumVar:
1085       int2store(buf, sz);
1086       memcpy(buf + 2, str, sz);
1087       break;
1088   }
1089   return buf;
1090 }
1091 
1092 /*
1093   acknowledge handling of schema operation
1094 */
1095 static int
ndbcluster_update_slock(THD * thd,const char * db,const char * table_name)1096 ndbcluster_update_slock(THD *thd,
1097                         const char *db,
1098                         const char *table_name)
1099 {
1100   DBUG_ENTER("ndbcluster_update_slock");
1101   if (!ndb_schema_share)
1102   {
1103     DBUG_RETURN(0);
1104   }
1105 
1106   const NdbError *ndb_error= 0;
1107   uint32 node_id= g_ndb_cluster_connection->node_id();
1108   Ndb *ndb= check_ndb_in_thd(thd);
1109   char save_db[FN_HEADLEN];
1110   strcpy(save_db, ndb->getDatabaseName());
1111 
1112   char tmp_buf[FN_REFLEN];
1113   NDBDICT *dict= ndb->getDictionary();
1114   ndb->setDatabaseName(NDB_REP_DB);
1115   Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1116   const NDBTAB *ndbtab= ndbtab_g.get_table();
1117   NdbTransaction *trans= 0;
1118   int retries= 100;
1119   int retry_sleep= 10; /* 10 milliseconds, transaction */
1120   const NDBCOL *col[SCHEMA_SIZE];
1121   unsigned sz[SCHEMA_SIZE];
1122 
1123   MY_BITMAP slock;
1124   uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
1125   bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
1126 
1127   if (ndbtab == 0)
1128   {
1129     abort();
1130     DBUG_RETURN(0);
1131   }
1132 
1133   {
1134     uint i;
1135     for (i= 0; i < SCHEMA_SIZE; i++)
1136     {
1137       col[i]= ndbtab->getColumn(i);
1138       if (i != SCHEMA_QUERY_I)
1139       {
1140         sz[i]= col[i]->getLength();
1141         DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1142       }
1143     }
1144   }
1145 
1146   while (1)
1147   {
1148     if ((trans= ndb->startTransaction()) == 0)
1149       goto err;
1150     {
1151       NdbOperation *op= 0;
1152       int r= 0;
1153 
1154       /* read the bitmap exlusive */
1155       r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1156       DBUG_ASSERT(r == 0);
1157       r|= op->readTupleExclusive();
1158       DBUG_ASSERT(r == 0);
1159 
1160       /* db */
1161       ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1162       r|= op->equal(SCHEMA_DB_I, tmp_buf);
1163       DBUG_ASSERT(r == 0);
1164       /* name */
1165       ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1166                        strlen(table_name));
1167       r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1168       DBUG_ASSERT(r == 0);
1169       /* slock */
1170       r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
1171       DBUG_ASSERT(r == 0);
1172     }
1173     if (trans->execute(NdbTransaction::NoCommit))
1174       goto err;
1175     bitmap_clear_bit(&slock, node_id);
1176     {
1177       NdbOperation *op= 0;
1178       int r= 0;
1179 
1180       /* now update the tuple */
1181       r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1182       DBUG_ASSERT(r == 0);
1183       r|= op->updateTuple();
1184       DBUG_ASSERT(r == 0);
1185 
1186       /* db */
1187       ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1188       r|= op->equal(SCHEMA_DB_I, tmp_buf);
1189       DBUG_ASSERT(r == 0);
1190       /* name */
1191       ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1192                        strlen(table_name));
1193       r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1194       DBUG_ASSERT(r == 0);
1195       /* slock */
1196       r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
1197       DBUG_ASSERT(r == 0);
1198       /* node_id */
1199       r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1200       DBUG_ASSERT(r == 0);
1201       /* type */
1202       r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
1203       DBUG_ASSERT(r == 0);
1204     }
1205     if (trans->execute(NdbTransaction::Commit) == 0)
1206     {
1207       dict->forceGCPWait();
1208       DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
1209                           node_id, db, table_name));
1210       break;
1211     }
1212   err:
1213     const NdbError *this_error= trans ?
1214       &trans->getNdbError() : &ndb->getNdbError();
1215     if (this_error->status == NdbError::TemporaryError)
1216     {
1217       if (retries--)
1218       {
1219         if (trans)
1220           ndb->closeTransaction(trans);
1221         my_sleep(retry_sleep);
1222         continue; // retry
1223       }
1224     }
1225     ndb_error= this_error;
1226     break;
1227   }
1228 
1229   if (ndb_error)
1230   {
1231     char buf[1024];
1232     my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
1233                 db, table_name);
1234     push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1235                         ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1236                         ndb_error->code, ndb_error->message, buf);
1237   }
1238   if (trans)
1239     ndb->closeTransaction(trans);
1240   ndb->setDatabaseName(save_db);
1241   DBUG_RETURN(0);
1242 }
1243 
1244 /*
1245   log query in schema table
1246 */
ndb_report_waiting(const char * key,int the_time,const char * op,const char * obj)1247 static void ndb_report_waiting(const char *key,
1248                                int the_time,
1249                                const char *op,
1250                                const char *obj)
1251 {
1252   ulonglong ndb_latest_epoch= 0;
1253   const char *proc_info= "<no info>";
1254   mysql_mutex_lock(&injector_mutex);
1255   if (injector_ndb)
1256     ndb_latest_epoch= injector_ndb->getLatestGCI();
1257   if (injector_thd)
1258     proc_info= injector_thd->proc_info;
1259   mysql_mutex_unlock(&injector_mutex);
1260   sql_print_information("NDB %s:"
1261                         " waiting max %u sec for %s %s."
1262                         "  epochs: (%u,%u,%u)"
1263                         "  injector proc_info: %s"
1264                         ,key, the_time, op, obj
1265                         ,(uint)ndb_latest_handled_binlog_epoch
1266                         ,(uint)ndb_latest_received_binlog_epoch
1267                         ,(uint)ndb_latest_epoch
1268                         ,proc_info
1269                         );
1270 }
1271 
ndbcluster_log_schema_op(THD * thd,NDB_SHARE * share,const char * query,int query_length,const char * db,const char * table_name,uint32 ndb_table_id,uint32 ndb_table_version,enum SCHEMA_OP_TYPE type,const char * new_db,const char * new_table_name)1272 int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
1273                              const char *query, int query_length,
1274                              const char *db, const char *table_name,
1275                              uint32 ndb_table_id,
1276                              uint32 ndb_table_version,
1277                              enum SCHEMA_OP_TYPE type,
1278                              const char *new_db, const char *new_table_name)
1279 {
1280   DBUG_ENTER("ndbcluster_log_schema_op");
1281   Thd_ndb *thd_ndb= get_thd_ndb(thd);
1282   if (!thd_ndb)
1283   {
1284     if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
1285     {
1286       sql_print_error("Could not allocate Thd_ndb object");
1287       DBUG_RETURN(1);
1288     }
1289     set_thd_ndb(thd, thd_ndb);
1290   }
1291 
1292   DBUG_PRINT("enter",
1293              ("query: %s  db: %s  table_name: %s  thd_ndb->options: %d",
1294               query, db, table_name, thd_ndb->options));
1295   if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1296   {
1297     DBUG_RETURN(0);
1298   }
1299 
1300   char tmp_buf2[FN_REFLEN];
1301   char quoted_table1[2 + 2 * FN_REFLEN + 1];
1302   char quoted_db1[2 + 2 * FN_REFLEN + 1];
1303   char quoted_db2[2 + 2 * FN_REFLEN + 1];
1304   char quoted_table2[2 + 2 * FN_REFLEN + 1];
1305   int id_length= 0;
1306   const char *type_str;
1307   switch (type)
1308   {
1309   case SOT_DROP_TABLE:
1310     /* drop database command, do not log at drop table */
1311     if (thd->lex->sql_command ==  SQLCOM_DROP_DB)
1312       DBUG_RETURN(0);
1313     /* redo the drop table query as is may contain several tables */
1314     query= tmp_buf2;
1315     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1316                                             table_name, 0);
1317     quoted_table1[id_length]= '\0';
1318     query_length= (uint) (strxmov(tmp_buf2, "drop table ",
1319                                   quoted_table1, NullS) - tmp_buf2);
1320     type_str= "drop table";
1321     break;
1322   case SOT_RENAME_TABLE:
1323     /* redo the rename table query as is may contain several tables */
1324     query= tmp_buf2;
1325     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1326                                             db, 0);
1327     quoted_db1[id_length]= '\0';
1328     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1329                                             table_name, 0);
1330     quoted_table1[id_length]= '\0';
1331     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
1332                                             new_db, 0);
1333     quoted_db2[id_length]= '\0';
1334     id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
1335                                             new_table_name, 0);
1336     quoted_table2[id_length]= '\0';
1337     query_length= (uint) (strxmov(tmp_buf2, "rename table ",
1338                                   quoted_db1, ".", quoted_table1, " to ",
1339                                   quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
1340     type_str= "rename table";
1341     break;
1342   case SOT_CREATE_TABLE:
1343     type_str= "create table";
1344     break;
1345   case SOT_ALTER_TABLE:
1346     type_str= "alter table";
1347     break;
1348   case SOT_DROP_DB:
1349     type_str= "drop db";
1350     break;
1351   case SOT_CREATE_DB:
1352     type_str= "create db";
1353     break;
1354   case SOT_ALTER_DB:
1355     type_str= "alter db";
1356     break;
1357   case SOT_TABLESPACE:
1358     type_str= "tablespace";
1359     break;
1360   case SOT_LOGFILE_GROUP:
1361     type_str= "logfile group";
1362     break;
1363   case SOT_TRUNCATE_TABLE:
1364     type_str= "truncate table";
1365     break;
1366   default:
1367     abort(); /* should not happen, programming error */
1368   }
1369 
1370   NDB_SCHEMA_OBJECT *ndb_schema_object;
1371   {
1372     char key[FN_REFLEN + 1];
1373     build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1374     ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
1375   }
1376 
1377   const NdbError *ndb_error= 0;
1378   uint32 node_id= g_ndb_cluster_connection->node_id();
1379   Uint64 epoch= 0;
1380   MY_BITMAP schema_subscribers;
1381   uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
1382   char bitbuf_e[sizeof(bitbuf)];
1383   bzero(bitbuf_e, sizeof(bitbuf_e));
1384   {
1385     int i, updated= 0;
1386     int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1387     bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE);
1388     bitmap_set_all(&schema_subscribers);
1389 
1390     /* begin protect ndb_schema_share */
1391     mysql_mutex_lock(&ndb_schema_share_mutex);
1392     if (ndb_schema_share == 0)
1393     {
1394       mysql_mutex_unlock(&ndb_schema_share_mutex);
1395       if (ndb_schema_object)
1396         ndb_free_schema_object(&ndb_schema_object, FALSE);
1397       DBUG_RETURN(0);
1398     }
1399     mysql_mutex_lock(&ndb_schema_share->mutex);
1400     for (i= 0; i < no_storage_nodes; i++)
1401     {
1402       MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i];
1403       if (!bitmap_is_clear_all(table_subscribers))
1404       {
1405         bitmap_intersect(&schema_subscribers,
1406                          table_subscribers);
1407         updated= 1;
1408       }
1409     }
1410     mysql_mutex_unlock(&ndb_schema_share->mutex);
1411     mysql_mutex_unlock(&ndb_schema_share_mutex);
1412     /* end protect ndb_schema_share */
1413 
1414     if (updated)
1415     {
1416       bitmap_clear_bit(&schema_subscribers, node_id);
1417       /*
1418         if setting own acknowledge bit it is important that
1419         no other mysqld's are registred, as subsequent code
1420         will cause the original event to be hidden (by blob
1421         merge event code)
1422       */
1423       if (bitmap_is_clear_all(&schema_subscribers))
1424           bitmap_set_bit(&schema_subscribers, node_id);
1425     }
1426     else
1427       bitmap_clear_all(&schema_subscribers);
1428 
1429     if (ndb_schema_object)
1430     {
1431       mysql_mutex_lock(&ndb_schema_object->mutex);
1432       memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
1433              sizeof(ndb_schema_object->slock));
1434       mysql_mutex_unlock(&ndb_schema_object->mutex);
1435     }
1436 
1437     DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap,
1438               no_bytes_in_map(&schema_subscribers));
1439     DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
1440                         bitmap_is_clear_all(&schema_subscribers)));
1441   }
1442 
1443   Ndb *ndb= thd_ndb->ndb;
1444   char save_db[FN_REFLEN];
1445   strcpy(save_db, ndb->getDatabaseName());
1446 
1447   char tmp_buf[FN_REFLEN];
1448   NDBDICT *dict= ndb->getDictionary();
1449   ndb->setDatabaseName(NDB_REP_DB);
1450   Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1451   const NDBTAB *ndbtab= ndbtab_g.get_table();
1452   NdbTransaction *trans= 0;
1453   int retries= 100;
1454   int retry_sleep= 10; /* 10 milliseconds, transaction */
1455   const NDBCOL *col[SCHEMA_SIZE];
1456   unsigned sz[SCHEMA_SIZE];
1457 
1458   if (ndbtab == 0)
1459   {
1460     if (strcmp(NDB_REP_DB, db) != 0 ||
1461         strcmp(NDB_SCHEMA_TABLE, table_name))
1462     {
1463       ndb_error= &dict->getNdbError();
1464     }
1465     goto end;
1466   }
1467 
1468   {
1469     uint i;
1470     for (i= 0; i < SCHEMA_SIZE; i++)
1471     {
1472       col[i]= ndbtab->getColumn(i);
1473       if (i != SCHEMA_QUERY_I)
1474       {
1475         sz[i]= col[i]->getLength();
1476         DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1477       }
1478     }
1479   }
1480 
1481   while (1)
1482   {
1483     const char *log_db= db;
1484     const char *log_tab= table_name;
1485     const char *log_subscribers= (char*)schema_subscribers.bitmap;
1486     uint32 log_type= (uint32)type;
1487     if ((trans= ndb->startTransaction()) == 0)
1488       goto err;
1489     while (1)
1490     {
1491       NdbOperation *op= 0;
1492       int r= 0;
1493       r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1494       DBUG_ASSERT(r == 0);
1495       r|= op->writeTuple();
1496       DBUG_ASSERT(r == 0);
1497 
1498       /* db */
1499       ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
1500       r|= op->equal(SCHEMA_DB_I, tmp_buf);
1501       DBUG_ASSERT(r == 0);
1502       /* name */
1503       ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
1504                        strlen(log_tab));
1505       r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1506       DBUG_ASSERT(r == 0);
1507       /* slock */
1508       DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
1509       r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
1510       DBUG_ASSERT(r == 0);
1511       /* query */
1512       {
1513         NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
1514         DBUG_ASSERT(ndb_blob != 0);
1515         uint blob_len= query_length;
1516         const char* blob_ptr= query;
1517         r|= ndb_blob->setValue(blob_ptr, blob_len);
1518         DBUG_ASSERT(r == 0);
1519       }
1520       /* node_id */
1521       r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1522       DBUG_ASSERT(r == 0);
1523       /* epoch */
1524       r|= op->setValue(SCHEMA_EPOCH_I, epoch);
1525       DBUG_ASSERT(r == 0);
1526       /* id */
1527       r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
1528       DBUG_ASSERT(r == 0);
1529       /* version */
1530       r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
1531       DBUG_ASSERT(r == 0);
1532       /* type */
1533       r|= op->setValue(SCHEMA_TYPE_I, log_type);
1534       DBUG_ASSERT(r == 0);
1535       /* any value */
1536       if (!(thd->variables.option_bits & OPTION_BIN_LOG))
1537         r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
1538       else
1539         r|= op->setAnyValue(thd->server_id);
1540       DBUG_ASSERT(r == 0);
1541       if (log_db != new_db && new_db && new_table_name)
1542       {
1543         log_db= new_db;
1544         log_tab= new_table_name;
1545         log_subscribers= bitbuf_e; // no ack expected on this
1546         log_type= (uint32)SOT_RENAME_TABLE_NEW;
1547         continue;
1548       }
1549       break;
1550     }
1551     if (trans->execute(NdbTransaction::Commit) == 0)
1552     {
1553       DBUG_PRINT("info", ("logged: %s", query));
1554       break;
1555     }
1556 err:
1557     const NdbError *this_error= trans ?
1558       &trans->getNdbError() : &ndb->getNdbError();
1559     if (this_error->status == NdbError::TemporaryError)
1560     {
1561       if (retries--)
1562       {
1563         if (trans)
1564           ndb->closeTransaction(trans);
1565         my_sleep(retry_sleep);
1566         continue; // retry
1567       }
1568     }
1569     ndb_error= this_error;
1570     break;
1571   }
1572 end:
1573   if (ndb_error)
1574     push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1575                         ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1576                         ndb_error->code,
1577                         ndb_error->message,
1578                         "Could not log query '%s' on other mysqld's");
1579 
1580   if (trans)
1581     ndb->closeTransaction(trans);
1582   ndb->setDatabaseName(save_db);
1583 
1584   /*
1585     Wait for other mysqld's to acknowledge the table operation
1586   */
1587   if (ndb_error == 0 &&
1588       !bitmap_is_clear_all(&schema_subscribers))
1589   {
1590     /*
1591       if own nodeid is set we are a single mysqld registred
1592       as an optimization we update the slock directly
1593     */
1594     if (bitmap_is_set(&schema_subscribers, node_id))
1595       ndbcluster_update_slock(thd, db, table_name);
1596     else
1597       dict->forceGCPWait();
1598 
1599     int max_timeout= DEFAULT_SYNC_TIMEOUT;
1600     mysql_mutex_lock(&ndb_schema_object->mutex);
1601     while (1)
1602     {
1603       struct timespec abstime;
1604       int i;
1605       int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1606       set_timespec(abstime, 1);
1607       int ret= mysql_cond_timedwait(&injector_cond,
1608                                     &ndb_schema_object->mutex,
1609                                     &abstime);
1610       if (thd->killed)
1611         break;
1612 
1613       /* begin protect ndb_schema_share */
1614       mysql_mutex_lock(&ndb_schema_share_mutex);
1615       if (ndb_schema_share == 0)
1616       {
1617         mysql_mutex_unlock(&ndb_schema_share_mutex);
1618         break;
1619       }
1620       mysql_mutex_lock(&ndb_schema_share->mutex);
1621       for (i= 0; i < no_storage_nodes; i++)
1622       {
1623         /* remove any unsubscribed from schema_subscribers */
1624         MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
1625         if (!bitmap_is_clear_all(tmp))
1626           bitmap_intersect(&schema_subscribers, tmp);
1627       }
1628       mysql_mutex_unlock(&ndb_schema_share->mutex);
1629       mysql_mutex_unlock(&ndb_schema_share_mutex);
1630       /* end protect ndb_schema_share */
1631 
1632       /* remove any unsubscribed from ndb_schema_object->slock */
1633       bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
1634 
1635       DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
1636                 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
1637                 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1638 
1639       if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
1640         break;
1641 
1642       if (ret)
1643       {
1644         max_timeout--;
1645         if (max_timeout == 0)
1646         {
1647           sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
1648                           type_str, ndb_schema_object->key);
1649           break;
1650         }
1651         if (opt_ndb_extra_logging)
1652           ndb_report_waiting(type_str, max_timeout,
1653                              "distributing", ndb_schema_object->key);
1654       }
1655     }
1656     mysql_mutex_unlock(&ndb_schema_object->mutex);
1657   }
1658 
1659   if (ndb_schema_object)
1660     ndb_free_schema_object(&ndb_schema_object, FALSE);
1661 
1662   DBUG_RETURN(0);
1663 }
1664 
1665 /*
1666   Handle _non_ data events from the storage nodes
1667 */
1668 int
ndb_handle_schema_change(THD * thd,Ndb * ndb,NdbEventOperation * pOp,NDB_SHARE * share)1669 ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
1670                          NDB_SHARE *share)
1671 {
1672   DBUG_ENTER("ndb_handle_schema_change");
1673   TABLE* table= share->table;
1674   TABLE_SHARE *table_share= share->table_share;
1675   const char *dbname= table_share->db.str;
1676   const char *tabname= table_share->table_name.str;
1677   bool do_close_cached_tables= FALSE;
1678   bool is_online_alter_table= FALSE;
1679   bool is_rename_table= FALSE;
1680   bool is_remote_change=
1681     (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
1682 
1683   if (pOp->getEventType() == NDBEVENT::TE_ALTER)
1684   {
1685     if (pOp->tableFrmChanged())
1686     {
1687       DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
1688       is_online_alter_table= TRUE;
1689     }
1690     else
1691     {
1692       DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
1693       DBUG_ASSERT(pOp->tableNameChanged());
1694       is_rename_table= TRUE;
1695     }
1696   }
1697 
1698   {
1699     ndb->setDatabaseName(dbname);
1700     Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
1701     const NDBTAB *ev_tab= pOp->getTable();
1702     const NDBTAB *cache_tab= ndbtab_g.get_table();
1703     if (cache_tab &&
1704         cache_tab->getObjectId() == ev_tab->getObjectId() &&
1705         cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
1706       ndbtab_g.invalidate();
1707   }
1708 
1709   /*
1710     Refresh local frm file and dictionary cache if
1711     remote on-line alter table
1712   */
1713   if (is_remote_change && is_online_alter_table)
1714   {
1715     const char *tabname= table_share->table_name.str;
1716     char key[FN_REFLEN + 1];
1717     uchar *data= 0, *pack_data= 0;
1718     size_t length, pack_length;
1719     int error;
1720     NDBDICT *dict= ndb->getDictionary();
1721     const NDBTAB *altered_table= pOp->getTable();
1722 
1723     DBUG_PRINT("info", ("Detected frm change of table %s.%s",
1724                         dbname, tabname));
1725     build_table_filename(key, FN_LEN - 1, dbname, tabname, NullS, 0);
1726     /*
1727       If the there is no local table shadowing the altered table and
1728       it has an frm that is different than the one on disk then
1729       overwrite it with the new table definition
1730     */
1731     if (!ndbcluster_check_if_local_table(dbname, tabname) &&
1732 	readfrm(key, &data, &length) == 0 &&
1733         packfrm(data, length, &pack_data, &pack_length) == 0 &&
1734         cmp_frm(altered_table, pack_data, pack_length))
1735     {
1736       DBUG_DUMP("frm", (uchar*) altered_table->getFrmData(),
1737                 altered_table->getFrmLength());
1738       Ndb_table_guard ndbtab_g(dict, tabname);
1739       const NDBTAB *old= ndbtab_g.get_table();
1740       if (!old &&
1741           old->getObjectVersion() != altered_table->getObjectVersion())
1742         dict->putTable(altered_table);
1743 
1744       my_free(data);
1745       data= NULL;
1746       if ((error= unpackfrm(&data, &length,
1747                             (const uchar*) altered_table->getFrmData())) ||
1748           (error= writefrm(key, data, length)))
1749       {
1750         sql_print_information("NDB: Failed write frm for %s.%s, error %d",
1751                               dbname, tabname, error);
1752       }
1753 
1754       // copy names as memory will be freed
1755       NdbAutoPtr<char> a1((char *)(dbname= strdup(dbname)));
1756       NdbAutoPtr<char> a2((char *)(tabname= strdup(tabname)));
1757       ndbcluster_binlog_close_table(thd, share);
1758 
1759       TABLE_LIST table_list;
1760       bzero((char*) &table_list,sizeof(table_list));
1761       table_list.db= (char *)dbname;
1762       table_list.alias= table_list.table_name= (char *)tabname;
1763       close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
1764 
1765       if ((error= ndbcluster_binlog_open_table(thd, share,
1766                                                table_share, table, 1)))
1767         sql_print_information("NDB: Failed to re-open table %s.%s",
1768                               dbname, tabname);
1769 
1770       table= share->table;
1771       table_share= share->table_share;
1772       dbname= table_share->db.str;
1773       tabname= table_share->table_name.str;
1774     }
1775     my_free(data);
1776     my_free(pack_data);
1777   }
1778 
1779   // If only frm was changed continue replicating
1780   if (is_online_alter_table)
1781   {
1782     /* Signal ha_ndbcluster::alter_table that drop is done */
1783     mysql_cond_signal(&injector_cond);
1784     DBUG_RETURN(0);
1785   }
1786 
1787   mysql_mutex_lock(&share->mutex);
1788   if (is_rename_table && !is_remote_change)
1789   {
1790     DBUG_PRINT("info", ("Detected name change of table %s.%s",
1791                         share->db, share->table_name));
1792     /* ToDo: remove printout */
1793     if (opt_ndb_extra_logging)
1794       sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
1795                             share_prefix, share->table->s->db.str,
1796                             share->table->s->table_name.str,
1797                             share->key);
1798     {
1799       ndb->setDatabaseName(share->table->s->db.str);
1800       Ndb_table_guard ndbtab_g(ndb->getDictionary(),
1801                                share->table->s->table_name.str);
1802       const NDBTAB *ev_tab= pOp->getTable();
1803       const NDBTAB *cache_tab= ndbtab_g.get_table();
1804       if (cache_tab &&
1805           cache_tab->getObjectId() == ev_tab->getObjectId() &&
1806           cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
1807         ndbtab_g.invalidate();
1808     }
1809     /* do the rename of the table in the share */
1810     share->table->s->db.str= share->db;
1811     share->table->s->db.length= strlen(share->db);
1812     share->table->s->table_name.str= share->table_name;
1813     share->table->s->table_name.length= strlen(share->table_name);
1814   }
1815   DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
1816   if (share->op_old == pOp)
1817     share->op_old= 0;
1818   else
1819     share->op= 0;
1820   // either just us or drop table handling as well
1821 
1822   /* Signal ha_ndbcluster::delete/rename_table that drop is done */
1823   mysql_mutex_unlock(&share->mutex);
1824   mysql_cond_signal(&injector_cond);
1825 
1826   mysql_mutex_lock(&ndbcluster_mutex);
1827   /* ndb_share reference binlog free */
1828   DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
1829                            share->key, share->use_count));
1830   free_share(&share, TRUE);
1831   if (is_remote_change && share && share->state != NSS_DROPPED)
1832   {
1833     DBUG_PRINT("info", ("remote change"));
1834     share->state= NSS_DROPPED;
1835     if (share->use_count != 1)
1836     {
1837       /* open handler holding reference */
1838       /* wait with freeing create ndb_share to below */
1839       do_close_cached_tables= TRUE;
1840     }
1841     else
1842     {
1843       /* ndb_share reference create free */
1844       DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
1845                                share->key, share->use_count));
1846       free_share(&share, TRUE);
1847       share= 0;
1848     }
1849   }
1850   else
1851     share= 0;
1852   mysql_mutex_unlock(&ndbcluster_mutex);
1853 
1854   pOp->setCustomData(0);
1855 
1856   mysql_mutex_lock(&injector_mutex);
1857   ndb->dropEventOperation(pOp);
1858   pOp= 0;
1859   mysql_mutex_unlock(&injector_mutex);
1860 
1861   if (do_close_cached_tables)
1862   {
1863     TABLE_LIST table_list;
1864     bzero((char*) &table_list,sizeof(table_list));
1865     table_list.db= (char *)dbname;
1866     table_list.alias= table_list.table_name= (char *)tabname;
1867     close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
1868     /* ndb_share reference create free */
1869     DBUG_PRINT("NDB_SHARE", ("%s create free  use_count: %u",
1870                              share->key, share->use_count));
1871     free_share(&share);
1872   }
1873   DBUG_RETURN(0);
1874 }
1875 
ndb_binlog_query(THD * thd,Cluster_schema * schema)1876 static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
1877 {
1878   if (schema->any_value & NDB_ANYVALUE_RESERVED)
1879   {
1880     if (schema->any_value != NDB_ANYVALUE_FOR_NOLOGGING)
1881       sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
1882                         "query not logged",
1883                         schema->any_value);
1884     return;
1885   }
1886   uint32 thd_server_id_save= thd->server_id;
1887   DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
1888   char *thd_db_save= thd->db;
1889   if (schema->any_value == 0)
1890     thd->server_id= ::server_id;
1891   else
1892     thd->server_id= schema->any_value;
1893   thd->db= schema->db;
1894   int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
1895   thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
1896                     schema->query_length, FALSE, TRUE,
1897                     schema->name[0] == 0 || thd->db[0] == 0,
1898                     errcode);
1899   thd->server_id= thd_server_id_save;
1900   thd->db= thd_db_save;
1901 }
1902 
1903 static int
ndb_binlog_thread_handle_schema_event(THD * thd,Ndb * ndb,NdbEventOperation * pOp,List<Cluster_schema> * post_epoch_log_list,List<Cluster_schema> * post_epoch_unlock_list,MEM_ROOT * mem_root)1904 ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
1905                                       NdbEventOperation *pOp,
1906                                       List<Cluster_schema>
1907                                       *post_epoch_log_list,
1908                                       List<Cluster_schema>
1909                                       *post_epoch_unlock_list,
1910                                       MEM_ROOT *mem_root)
1911 {
1912   DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
1913   NDB_SHARE *tmp_share= (NDB_SHARE *)pOp->getCustomData();
1914   if (tmp_share && ndb_schema_share == tmp_share)
1915   {
1916     NDBEVENT::TableEvent ev_type= pOp->getEventType();
1917     DBUG_PRINT("enter", ("%s.%s  ev_type: %d",
1918                          tmp_share->db, tmp_share->table_name, ev_type));
1919     if (ev_type == NDBEVENT::TE_UPDATE ||
1920         ev_type == NDBEVENT::TE_INSERT)
1921     {
1922       Cluster_schema *schema= (Cluster_schema *)
1923         sql_alloc(sizeof(Cluster_schema));
1924       MY_BITMAP slock;
1925       bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
1926       uint node_id= g_ndb_cluster_connection->node_id();
1927       {
1928         ndbcluster_get_schema(tmp_share, schema);
1929         schema->any_value= pOp->getAnyValue();
1930       }
1931       enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
1932       DBUG_PRINT("info",
1933                  ("%s.%s: log query_length: %d  query: '%s'  type: %d",
1934                   schema->db, schema->name,
1935                   schema->query_length, schema->query,
1936                   schema_type));
1937       if (schema_type == SOT_CLEAR_SLOCK)
1938       {
1939         /*
1940           handle slock after epoch is completed to ensure that
1941           schema events get inserted in the binlog after any data
1942           events
1943         */
1944         post_epoch_log_list->push_back(schema, mem_root);
1945         DBUG_RETURN(0);
1946       }
1947       if (schema->node_id != node_id)
1948       {
1949         int log_query= 0, post_epoch_unlock= 0;
1950         switch (schema_type)
1951         {
1952         case SOT_DROP_TABLE:
1953           // fall through
1954         case SOT_RENAME_TABLE:
1955           // fall through
1956         case SOT_RENAME_TABLE_NEW:
1957           // fall through
1958         case SOT_ALTER_TABLE:
1959           post_epoch_log_list->push_back(schema, mem_root);
1960           /* acknowledge this query _after_ epoch completion */
1961           post_epoch_unlock= 1;
1962           break;
1963 	case SOT_TRUNCATE_TABLE:
1964         {
1965           char key[FN_REFLEN + 1];
1966           build_table_filename(key, sizeof(key) - 1,
1967                                schema->db, schema->name, "", 0);
1968           /* ndb_share reference temporary, free below */
1969           NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
1970           if (share)
1971           {
1972             DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
1973                                      share->key, share->use_count));
1974           }
1975           // invalidation already handled by binlog thread
1976           if (!share || !share->op)
1977           {
1978             {
1979               injector_ndb->setDatabaseName(schema->db);
1980               Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
1981                                        schema->name);
1982               ndbtab_g.invalidate();
1983             }
1984             TABLE_LIST table_list;
1985             bzero((char*) &table_list,sizeof(table_list));
1986             table_list.db= schema->db;
1987             table_list.alias= table_list.table_name= schema->name;
1988             close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
1989           }
1990           /* ndb_share reference temporary free */
1991           if (share)
1992           {
1993             DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
1994                                      share->key, share->use_count));
1995             free_share(&share);
1996           }
1997         }
1998         // fall through
1999         case SOT_CREATE_TABLE:
2000           if (ndbcluster_check_if_local_table(schema->db, schema->name))
2001           {
2002             DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2003                                 schema->db, schema->name));
2004             sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2005                             "binlog schema event '%s' from node %d. ",
2006                             schema->db, schema->name, schema->query,
2007                             schema->node_id);
2008           }
2009           else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2010           {
2011             print_could_not_discover_error(thd, schema);
2012           }
2013           log_query= 1;
2014           break;
2015         case SOT_DROP_DB:
2016           /* Drop the database locally if it only contains ndb tables */
2017           if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
2018           {
2019             const int no_print_error[1]= {0};
2020             run_query(thd, schema->query,
2021                       schema->query + schema->query_length,
2022                       no_print_error,    /* print error */
2023                       TRUE);   /* don't binlog the query */
2024             /* binlog dropping database after any table operations */
2025             post_epoch_log_list->push_back(schema, mem_root);
2026             /* acknowledge this query _after_ epoch completion */
2027             post_epoch_unlock= 1;
2028           }
2029           else
2030           {
2031             /* Database contained local tables, leave it */
2032             sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
2033                             "binlog schema event '%s' from node %d. ",
2034                             schema->db, schema->query,
2035                             schema->node_id);
2036             log_query= 1;
2037           }
2038           break;
2039         case SOT_CREATE_DB:
2040           /* fall through */
2041         case SOT_ALTER_DB:
2042         {
2043           const int no_print_error[1]= {0};
2044           run_query(thd, schema->query,
2045                     schema->query + schema->query_length,
2046                     no_print_error,    /* print error */
2047                     TRUE);   /* don't binlog the query */
2048           log_query= 1;
2049           break;
2050         }
2051         case SOT_TABLESPACE:
2052         case SOT_LOGFILE_GROUP:
2053           log_query= 1;
2054           break;
2055         case SOT_CLEAR_SLOCK:
2056           abort();
2057         }
2058         if (log_query && ndb_binlog_running)
2059           ndb_binlog_query(thd, schema);
2060         /* signal that schema operation has been handled */
2061         DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
2062         if (bitmap_is_set(&slock, node_id))
2063         {
2064           if (post_epoch_unlock)
2065             post_epoch_unlock_list->push_back(schema, mem_root);
2066           else
2067             ndbcluster_update_slock(thd, schema->db, schema->name);
2068         }
2069       }
2070       DBUG_RETURN(0);
2071     }
2072     /*
2073       the normal case of UPDATE/INSERT has already been handled
2074     */
2075     switch (ev_type)
2076     {
2077     case NDBEVENT::TE_DELETE:
2078       // skip
2079       break;
2080     case NDBEVENT::TE_CLUSTER_FAILURE:
2081       if (opt_ndb_extra_logging)
2082         sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
2083                               ndb_schema_share->key, (unsigned) pOp->getGCI());
2084       // fall through
2085     case NDBEVENT::TE_DROP:
2086       if (opt_ndb_extra_logging &&
2087           ndb_binlog_tables_inited && ndb_binlog_running)
2088         sql_print_information("NDB Binlog: ndb tables initially "
2089                               "read only on reconnect.");
2090 
2091       /* begin protect ndb_schema_share */
2092       mysql_mutex_lock(&ndb_schema_share_mutex);
2093       /* ndb_share reference binlog extra free */
2094       DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
2095                                ndb_schema_share->key,
2096                                ndb_schema_share->use_count));
2097       free_share(&ndb_schema_share);
2098       ndb_schema_share= 0;
2099       ndb_binlog_tables_inited= 0;
2100       mysql_mutex_unlock(&ndb_schema_share_mutex);
2101       /* end protect ndb_schema_share */
2102 
2103       close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
2104       // fall through
2105     case NDBEVENT::TE_ALTER:
2106       ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
2107       break;
2108     case NDBEVENT::TE_NODE_FAILURE:
2109     {
2110       uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2111       DBUG_ASSERT(node_id != 0xFF);
2112       mysql_mutex_lock(&tmp_share->mutex);
2113       bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
2114       DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
2115       if (opt_ndb_extra_logging)
2116       {
2117         sql_print_information("NDB Binlog: Node: %d, down,"
2118                               " Subscriber bitmask %x%x",
2119                               pOp->getNdbdNodeId(),
2120                               tmp_share->subscriber_bitmap[node_id].bitmap[1],
2121                               tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2122       }
2123       mysql_mutex_unlock(&tmp_share->mutex);
2124       mysql_cond_signal(&injector_cond);
2125       break;
2126     }
2127     case NDBEVENT::TE_SUBSCRIBE:
2128     {
2129       uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2130       uint8 req_id= pOp->getReqNodeId();
2131       DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2132       mysql_mutex_lock(&tmp_share->mutex);
2133       bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2134       DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
2135       if (opt_ndb_extra_logging)
2136       {
2137         sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
2138                               " Subscriber bitmask %x%x",
2139                               pOp->getNdbdNodeId(),
2140                               req_id,
2141                               tmp_share->subscriber_bitmap[node_id].bitmap[1],
2142                               tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2143       }
2144       mysql_mutex_unlock(&tmp_share->mutex);
2145       mysql_cond_signal(&injector_cond);
2146       break;
2147     }
2148     case NDBEVENT::TE_UNSUBSCRIBE:
2149     {
2150       uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2151       uint8 req_id= pOp->getReqNodeId();
2152       DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2153       mysql_mutex_lock(&tmp_share->mutex);
2154       bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2155       DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
2156       if (opt_ndb_extra_logging)
2157       {
2158         sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
2159                               " Subscriber bitmask %x%x",
2160                               pOp->getNdbdNodeId(),
2161                               req_id,
2162                               tmp_share->subscriber_bitmap[node_id].bitmap[1],
2163                               tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2164       }
2165       mysql_mutex_unlock(&tmp_share->mutex);
2166       mysql_cond_signal(&injector_cond);
2167       break;
2168     }
2169     default:
2170       sql_print_error("NDB Binlog: unknown non data event %d for %s. "
2171                       "Ignoring...", (unsigned) ev_type, tmp_share->key);
2172     }
2173   }
2174   DBUG_RETURN(0);
2175 }
2176 
2177 /*
2178   process any operations that should be done after
2179   the epoch is complete
2180 */
2181 static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD * thd,List<Cluster_schema> * post_epoch_log_list,List<Cluster_schema> * post_epoch_unlock_list)2182 ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
2183                                                  List<Cluster_schema>
2184                                                  *post_epoch_log_list,
2185                                                  List<Cluster_schema>
2186                                                  *post_epoch_unlock_list)
2187 {
2188   if (post_epoch_log_list->elements == 0)
2189     return;
2190   DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
2191   Cluster_schema *schema;
2192   while ((schema= post_epoch_log_list->pop()))
2193   {
2194     DBUG_PRINT("info",
2195                ("%s.%s: log query_length: %d  query: '%s'  type: %d",
2196                 schema->db, schema->name,
2197                 schema->query_length, schema->query,
2198                 schema->type));
2199     int log_query= 0;
2200     {
2201       enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2202       char key[FN_REFLEN + 1];
2203       build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2204       if (schema_type == SOT_CLEAR_SLOCK)
2205       {
2206         mysql_mutex_lock(&ndbcluster_mutex);
2207         NDB_SCHEMA_OBJECT *ndb_schema_object=
2208           (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
2209                                               (uchar*) key, strlen(key));
2210         if (ndb_schema_object)
2211         {
2212           mysql_mutex_lock(&ndb_schema_object->mutex);
2213           memcpy(ndb_schema_object->slock, schema->slock,
2214                  sizeof(ndb_schema_object->slock));
2215           DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
2216                     (uchar*)ndb_schema_object->slock_bitmap.bitmap,
2217                     no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2218           mysql_mutex_unlock(&ndb_schema_object->mutex);
2219           mysql_cond_signal(&injector_cond);
2220         }
2221         mysql_mutex_unlock(&ndbcluster_mutex);
2222         continue;
2223       }
2224       /* ndb_share reference temporary, free below */
2225       NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2226       if (share)
2227       {
2228         DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
2229                                  share->key, share->use_count));
2230       }
2231       switch (schema_type)
2232       {
2233       case SOT_DROP_DB:
2234         log_query= 1;
2235         break;
2236       case SOT_DROP_TABLE:
2237         log_query= 1;
2238         // invalidation already handled by binlog thread
2239         if (share && share->op)
2240         {
2241           break;
2242         }
2243         // fall through
2244       case SOT_RENAME_TABLE:
2245         // fall through
2246       case SOT_ALTER_TABLE:
2247         // invalidation already handled by binlog thread
2248         if (!share || !share->op)
2249         {
2250           {
2251             injector_ndb->setDatabaseName(schema->db);
2252             Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
2253                                      schema->name);
2254             ndbtab_g.invalidate();
2255           }
2256           TABLE_LIST table_list;
2257           bzero((char*) &table_list,sizeof(table_list));
2258           table_list.db= schema->db;
2259           table_list.alias= table_list.table_name= schema->name;
2260           close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
2261         }
2262         if (schema_type != SOT_ALTER_TABLE)
2263           break;
2264         // fall through
2265       case SOT_RENAME_TABLE_NEW:
2266         log_query= 1;
2267         if (ndb_binlog_running && (!share || !share->op))
2268         {
2269           /*
2270             we need to free any share here as command below
2271             may need to call handle_trailing_share
2272           */
2273           if (share)
2274           {
2275             /* ndb_share reference temporary free */
2276             DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
2277                                      share->key, share->use_count));
2278             free_share(&share);
2279             share= 0;
2280           }
2281           if (ndbcluster_check_if_local_table(schema->db, schema->name))
2282           {
2283             DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2284                                 schema->db, schema->name));
2285             sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2286                             "binlog schema event '%s' from node %d. ",
2287                             schema->db, schema->name, schema->query,
2288                             schema->node_id);
2289           }
2290           else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2291           {
2292             print_could_not_discover_error(thd, schema);
2293           }
2294         }
2295         break;
2296       default:
2297         DBUG_ASSERT(FALSE);
2298       }
2299       if (share)
2300       {
2301         /* ndb_share reference temporary free */
2302         DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
2303                                  share->key, share->use_count));
2304         free_share(&share);
2305         share= 0;
2306       }
2307     }
2308     if (ndb_binlog_running && log_query)
2309       ndb_binlog_query(thd, schema);
2310   }
2311   while ((schema= post_epoch_unlock_list->pop()))
2312   {
2313     ndbcluster_update_slock(thd, schema->db, schema->name);
2314   }
2315   DBUG_VOID_RETURN;
2316 }
2317 
2318 /*
2319   Timer class for doing performance measurements
2320 */
2321 
2322 /*********************************************************************
2323   Internal helper functions for handeling of the cluster replication tables
2324   - ndb_binlog_index
2325   - ndb_apply_status
2326 *********************************************************************/
2327 
2328 /*
2329   struct to hold the data to be inserted into the
2330   ndb_binlog_index table
2331 */
2332 struct ndb_binlog_index_row {
2333   ulonglong gci;
2334   const char *master_log_file;
2335   ulonglong master_log_pos;
2336   ulonglong n_inserts;
2337   ulonglong n_updates;
2338   ulonglong n_deletes;
2339   ulonglong n_schemaops;
2340 };
2341 
2342 /*
2343   Open the ndb_binlog_index table
2344 */
open_ndb_binlog_index(THD * thd,TABLE ** ndb_binlog_index)2345 static int open_ndb_binlog_index(THD *thd, TABLE **ndb_binlog_index)
2346 {
2347   static char repdb[]= NDB_REP_DB;
2348   static char reptable[]= NDB_REP_TABLE;
2349   const char *save_proc_info= thd->proc_info;
2350   TABLE_LIST *tables= &binlog_tables;
2351 
2352   tables->init_one_table(repdb, strlen(repdb), reptable, strlen(reptable),
2353                          reptable, TL_WRITE);
2354   thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
2355 
2356   tables->required_type= FRMTYPE_TABLE;
2357   thd->clear_error();
2358   if (open_and_lock_tables(thd, tables, FALSE, 0))
2359   {
2360     if (thd->killed)
2361       sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
2362     else
2363       sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
2364                       thd->stmt_da->sql_errno(),
2365                       thd->stmt_da->message());
2366     thd->proc_info= save_proc_info;
2367     return -1;
2368   }
2369   *ndb_binlog_index= tables->table;
2370   thd->proc_info= save_proc_info;
2371   (*ndb_binlog_index)->use_all_columns();
2372   return 0;
2373 }
2374 
2375 
2376 /*
2377   Insert one row in the ndb_binlog_index
2378 */
2379 
ndb_add_ndb_binlog_index(THD * thd,void * _row)2380 int ndb_add_ndb_binlog_index(THD *thd, void *_row)
2381 {
2382   ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row;
2383   int error= 0;
2384   /*
2385     Turn of binlogging to prevent the table changes to be written to
2386     the binary log.
2387   */
2388   ulong saved_options= thd->variables.option_bits;
2389   thd->variables.option_bits&= ~OPTION_BIN_LOG;
2390 
2391   if (!ndb_binlog_index && open_ndb_binlog_index(thd, &ndb_binlog_index))
2392   {
2393     sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
2394     error= -1;
2395     goto add_ndb_binlog_index_err;
2396   }
2397 
2398   /*
2399     Intialize ndb_binlog_index->record[0]
2400   */
2401   empty_record(ndb_binlog_index);
2402 
2403   ndb_binlog_index->field[0]->store(row.master_log_pos);
2404   ndb_binlog_index->field[1]->store(row.master_log_file,
2405                                 strlen(row.master_log_file),
2406                                 &my_charset_bin);
2407   ndb_binlog_index->field[2]->store(row.gci);
2408   ndb_binlog_index->field[3]->store(row.n_inserts);
2409   ndb_binlog_index->field[4]->store(row.n_updates);
2410   ndb_binlog_index->field[5]->store(row.n_deletes);
2411   ndb_binlog_index->field[6]->store(row.n_schemaops);
2412 
2413   if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
2414   {
2415     sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error);
2416     error= -1;
2417     goto add_ndb_binlog_index_err;
2418   }
2419 
2420 add_ndb_binlog_index_err:
2421   thd->stmt_da->can_overwrite_status= TRUE;
2422   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
2423   thd->stmt_da->can_overwrite_status= FALSE;
2424   close_thread_tables(thd);
2425   /*
2426     There should be no need for rolling back transaction due to deadlock
2427     (since ndb_binlog_index is non transactional).
2428   */
2429   DBUG_ASSERT(! thd->transaction_rollback_request);
2430 
2431   thd->mdl_context.release_transactional_locks();
2432   ndb_binlog_index= 0;
2433   thd->variables.option_bits= saved_options;
2434   return error;
2435 }
2436 
2437 /*********************************************************************
2438   Functions for start, stop, wait for ndbcluster binlog thread
2439 *********************************************************************/
2440 
2441 enum Binlog_thread_state
2442 {
2443   BCCC_running= 0,
2444   BCCC_exit= 1,
2445   BCCC_restart= 2
2446 };
2447 
2448 static enum Binlog_thread_state do_ndbcluster_binlog_close_connection= BCCC_restart;
2449 
ndbcluster_binlog_start()2450 int ndbcluster_binlog_start()
2451 {
2452   DBUG_ENTER("ndbcluster_binlog_start");
2453 
2454   if (::server_id == 0)
2455   {
2456     sql_print_warning("NDB: server id set to zero will cause any other mysqld "
2457                       "with bin log to log with wrong server id");
2458   }
2459   else if (::server_id & 0x1 << 31)
2460   {
2461     sql_print_error("NDB: server id's with high bit set is reserved for internal "
2462                     "purposes");
2463     DBUG_RETURN(-1);
2464   }
2465 
2466   mysql_mutex_init(key_injector_mutex, &injector_mutex, MY_MUTEX_INIT_FAST);
2467   mysql_cond_init(key_injector_cond, &injector_cond, NULL);
2468   mysql_mutex_init(key_ndb_schema_share_mutex,
2469                    &ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
2470 
2471   /* Create injector thread */
2472   if (mysql_thread_create(key_thread_ndb_binlog,
2473                           &ndb_binlog_thread, &connection_attrib,
2474                           ndb_binlog_thread_func, 0))
2475   {
2476     DBUG_PRINT("error", ("Could not create ndb injector thread"));
2477     mysql_cond_destroy(&injector_cond);
2478     mysql_mutex_destroy(&injector_mutex);
2479     DBUG_RETURN(-1);
2480   }
2481 
2482   ndbcluster_binlog_inited= 1;
2483 
2484   /* Wait for the injector thread to start */
2485   mysql_mutex_lock(&injector_mutex);
2486   while (!ndb_binlog_thread_running)
2487     mysql_cond_wait(&injector_cond, &injector_mutex);
2488   mysql_mutex_unlock(&injector_mutex);
2489 
2490   if (ndb_binlog_thread_running < 0)
2491     DBUG_RETURN(-1);
2492 
2493   DBUG_RETURN(0);
2494 }
2495 
2496 
2497 /**************************************************************
2498   Internal helper functions for creating/dropping ndb events
2499   used by the client sql threads
2500 **************************************************************/
2501 void
ndb_rep_event_name(String * event_name,const char * db,const char * tbl)2502 ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
2503 {
2504   event_name->set_ascii("REPL$", 5);
2505   event_name->append(db);
2506   if (tbl)
2507   {
2508     event_name->append('/');
2509     event_name->append(tbl);
2510   }
2511 }
2512 
2513 bool
ndbcluster_check_if_local_table(const char * dbname,const char * tabname)2514 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
2515 {
2516   char key[FN_REFLEN + 1];
2517   char ndb_file[FN_REFLEN + 1];
2518 
2519   DBUG_ENTER("ndbcluster_check_if_local_table");
2520   build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
2521   build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
2522   /* Check that any defined table is an ndb table */
2523   DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
2524   if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
2525   {
2526     DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
2527 
2528 
2529     DBUG_RETURN(true);
2530   }
2531 
2532   DBUG_RETURN(false);
2533 }
2534 
2535 bool
ndbcluster_check_if_local_tables_in_db(THD * thd,const char * dbname)2536 ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
2537 {
2538   DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
2539   DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
2540   LEX_STRING *tabname;
2541   List<LEX_STRING> files;
2542   char path[FN_REFLEN + 1];
2543 
2544   build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
2545   if (find_files(thd, &files, dbname, path, NullS, 0, NULL) !=
2546       FIND_FILES_OK)
2547   {
2548     DBUG_PRINT("info", ("Failed to find files"));
2549     DBUG_RETURN(true);
2550   }
2551   DBUG_PRINT("info",("found: %d files", files.elements));
2552   while ((tabname= files.pop()))
2553   {
2554     DBUG_PRINT("info", ("Found table %s", tabname->str));
2555     if (ndbcluster_check_if_local_table(dbname, tabname->str))
2556       DBUG_RETURN(true);
2557   }
2558 
2559   DBUG_RETURN(false);
2560 }
2561 
2562 /*
2563   Common function for setting up everything for logging a table at
2564   create/discover.
2565 */
ndbcluster_create_binlog_setup(Ndb * ndb,const char * key,uint key_len,const char * db,const char * table_name,my_bool share_may_exist)2566 int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
2567                                    uint key_len,
2568                                    const char *db,
2569                                    const char *table_name,
2570                                    my_bool share_may_exist)
2571 {
2572   int do_event_op= ndb_binlog_running;
2573   DBUG_ENTER("ndbcluster_create_binlog_setup");
2574   DBUG_PRINT("enter",("key: %s  key_len: %d  %s.%s  share_may_exist: %d",
2575                       key, key_len, db, table_name, share_may_exist));
2576   DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
2577   DBUG_ASSERT(strlen(key) == key_len);
2578 
2579   mysql_mutex_lock(&ndbcluster_mutex);
2580 
2581   /* Handle any trailing share */
2582   NDB_SHARE *share= (NDB_SHARE*) my_hash_search(&ndbcluster_open_tables,
2583                                                 (uchar*) key, key_len);
2584 
2585   if (share && share_may_exist)
2586   {
2587     if (share->flags & NSF_NO_BINLOG ||
2588         share->op != 0 ||
2589         share->op_old != 0)
2590     {
2591       mysql_mutex_unlock(&ndbcluster_mutex);
2592       DBUG_RETURN(0); // replication already setup, or should not
2593     }
2594   }
2595 
2596   if (share)
2597   {
2598     if (share->op || share->op_old)
2599     {
2600       my_errno= HA_ERR_TABLE_EXIST;
2601       mysql_mutex_unlock(&ndbcluster_mutex);
2602       DBUG_RETURN(1);
2603     }
2604     if (!share_may_exist || share->connect_count !=
2605         g_ndb_cluster_connection->get_connect_count())
2606     {
2607       handle_trailing_share(share);
2608       share= NULL;
2609     }
2610   }
2611 
2612   /* Create share which is needed to hold replication information */
2613   if (share)
2614   {
2615     /* ndb_share reference create */
2616     ++share->use_count;
2617     DBUG_PRINT("NDB_SHARE", ("%s create  use_count: %u",
2618                              share->key, share->use_count));
2619   }
2620   /* ndb_share reference create */
2621   else if (!(share= get_share(key, 0, TRUE, TRUE)))
2622   {
2623     sql_print_error("NDB Binlog: "
2624                     "allocating table share for %s failed", key);
2625   }
2626   else
2627   {
2628     DBUG_PRINT("NDB_SHARE", ("%s create  use_count: %u",
2629                              share->key, share->use_count));
2630   }
2631 
2632   if (!ndb_schema_share &&
2633       strcmp(share->db, NDB_REP_DB) == 0 &&
2634       strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
2635     do_event_op= 1;
2636   else if (!ndb_apply_status_share &&
2637            strcmp(share->db, NDB_REP_DB) == 0 &&
2638            strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
2639     do_event_op= 1;
2640 
2641   if (!do_event_op)
2642   {
2643     share->flags|= NSF_NO_BINLOG;
2644     mysql_mutex_unlock(&ndbcluster_mutex);
2645     DBUG_RETURN(0);
2646   }
2647   mysql_mutex_unlock(&ndbcluster_mutex);
2648 
2649   while (share && !IS_TMP_PREFIX(table_name))
2650   {
2651     /*
2652       ToDo make sanity check of share so that the table is actually the same
2653       I.e. we need to do open file from frm in this case
2654       Currently awaiting this to be fixed in the 4.1 tree in the general
2655       case
2656     */
2657 
2658     /* Create the event in NDB */
2659     ndb->setDatabaseName(db);
2660 
2661     NDBDICT *dict= ndb->getDictionary();
2662     Ndb_table_guard ndbtab_g(dict, table_name);
2663     const NDBTAB *ndbtab= ndbtab_g.get_table();
2664     if (ndbtab == 0)
2665     {
2666       if (opt_ndb_extra_logging)
2667         sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
2668                               "%s, %d", key, dict->getNdbError().message,
2669                               dict->getNdbError().code);
2670       break; // error
2671     }
2672     String event_name(INJECTOR_EVENT_LEN);
2673     ndb_rep_event_name(&event_name, db, table_name);
2674     /*
2675       event should have been created by someone else,
2676       but let's make sure, and create if it doesn't exist
2677     */
2678     const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
2679     if (!ev)
2680     {
2681       if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
2682       {
2683         sql_print_error("NDB Binlog: "
2684                         "FAILED CREATE (DISCOVER) TABLE Event: %s",
2685                         event_name.c_ptr());
2686         break; // error
2687       }
2688       if (opt_ndb_extra_logging)
2689         sql_print_information("NDB Binlog: "
2690                               "CREATE (DISCOVER) TABLE Event: %s",
2691                               event_name.c_ptr());
2692     }
2693     else
2694     {
2695       delete ev;
2696       if (opt_ndb_extra_logging)
2697         sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
2698                               event_name.c_ptr());
2699     }
2700 
2701     /*
2702       create the event operations for receiving logging events
2703     */
2704     if (ndbcluster_create_event_ops(share, ndbtab, event_name.c_ptr()))
2705     {
2706       sql_print_error("NDB Binlog:"
2707                       "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
2708                       event_name.c_ptr());
2709       /* a warning has been issued to the client */
2710       DBUG_RETURN(0);
2711     }
2712     DBUG_RETURN(0);
2713   }
2714   DBUG_RETURN(-1);
2715 }
2716 
2717 int
ndbcluster_create_event(Ndb * ndb,const NDBTAB * ndbtab,const char * event_name,NDB_SHARE * share,int push_warning)2718 ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
2719                         const char *event_name, NDB_SHARE *share,
2720                         int push_warning)
2721 {
2722   THD *thd= current_thd;
2723   DBUG_ENTER("ndbcluster_create_event");
2724   DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
2725                       ndbtab->getName(), ndbtab->getObjectVersion(),
2726                       event_name, share ? share->key : "(nil)"));
2727   DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
2728   if (!share)
2729   {
2730     DBUG_PRINT("info", ("share == NULL"));
2731     DBUG_RETURN(0);
2732   }
2733   if (share->flags & NSF_NO_BINLOG)
2734   {
2735     DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
2736                         share->flags, share->flags & NSF_NO_BINLOG));
2737     DBUG_RETURN(0);
2738   }
2739 
2740   NDBDICT *dict= ndb->getDictionary();
2741   NDBEVENT my_event(event_name);
2742   my_event.setTable(*ndbtab);
2743   my_event.addTableEvent(NDBEVENT::TE_ALL);
2744   if (share->flags & NSF_HIDDEN_PK)
2745   {
2746     if (share->flags & NSF_BLOB_FLAG)
2747     {
2748       sql_print_error("NDB Binlog: logging of table %s "
2749                       "with BLOB attribute and no PK is not supported",
2750                       share->key);
2751       if (push_warning)
2752         push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2753                             ER_ILLEGAL_HA_CREATE_OPTION,
2754                             ER(ER_ILLEGAL_HA_CREATE_OPTION),
2755                             ndbcluster_hton_name,
2756                             "Binlog of table with BLOB attribute and no PK");
2757 
2758       share->flags|= NSF_NO_BINLOG;
2759       DBUG_RETURN(-1);
2760     }
2761     /* No primary key, subscribe for all attributes */
2762     my_event.setReport(NDBEVENT::ER_ALL);
2763     DBUG_PRINT("info", ("subscription all"));
2764   }
2765   else
2766   {
2767     if (ndb_schema_share || strcmp(share->db, NDB_REP_DB) ||
2768         strcmp(share->table_name, NDB_SCHEMA_TABLE))
2769     {
2770       my_event.setReport(NDBEVENT::ER_UPDATED);
2771       DBUG_PRINT("info", ("subscription only updated"));
2772     }
2773     else
2774     {
2775       my_event.setReport((NDBEVENT::EventReport)
2776                          (NDBEVENT::ER_ALL | NDBEVENT::ER_SUBSCRIBE));
2777       DBUG_PRINT("info", ("subscription all and subscribe"));
2778     }
2779   }
2780   if (share->flags & NSF_BLOB_FLAG)
2781     my_event.mergeEvents(TRUE);
2782 
2783   /* add all columns to the event */
2784   int n_cols= ndbtab->getNoOfColumns();
2785   for(int a= 0; a < n_cols; a++)
2786     my_event.addEventColumn(a);
2787 
2788   if (dict->createEvent(my_event)) // Add event to database
2789   {
2790     if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
2791     {
2792       /*
2793         failed, print a warning
2794       */
2795       if (push_warning > 1)
2796         push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2797                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2798                             dict->getNdbError().code,
2799                             dict->getNdbError().message, "NDB");
2800       sql_print_error("NDB Binlog: Unable to create event in database. "
2801                       "Event: %s  Error Code: %d  Message: %s", event_name,
2802                       dict->getNdbError().code, dict->getNdbError().message);
2803       DBUG_RETURN(-1);
2804     }
2805 
2806     /*
2807       try retrieving the event, if table version/id matches, we will get
2808       a valid event.  Otherwise we have a trailing event from before
2809     */
2810     const NDBEVENT *ev;
2811     if ((ev= dict->getEvent(event_name)))
2812     {
2813       delete ev;
2814       DBUG_RETURN(0);
2815     }
2816 
2817     /*
2818       trailing event from before; an error, but try to correct it
2819     */
2820     if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
2821         dict->dropEvent(my_event.getName()))
2822     {
2823       if (push_warning > 1)
2824         push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2825                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2826                             dict->getNdbError().code,
2827                             dict->getNdbError().message, "NDB");
2828       sql_print_error("NDB Binlog: Unable to create event in database. "
2829                       " Attempt to correct with drop failed. "
2830                       "Event: %s Error Code: %d Message: %s",
2831                       event_name,
2832                       dict->getNdbError().code,
2833                       dict->getNdbError().message);
2834       DBUG_RETURN(-1);
2835     }
2836 
2837     /*
2838       try to add the event again
2839     */
2840     if (dict->createEvent(my_event))
2841     {
2842       if (push_warning > 1)
2843         push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2844                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2845                             dict->getNdbError().code,
2846                             dict->getNdbError().message, "NDB");
2847       sql_print_error("NDB Binlog: Unable to create event in database. "
2848                       " Attempt to correct with drop ok, but create failed. "
2849                       "Event: %s Error Code: %d Message: %s",
2850                       event_name,
2851                       dict->getNdbError().code,
2852                       dict->getNdbError().message);
2853       DBUG_RETURN(-1);
2854     }
2855 #ifdef NDB_BINLOG_EXTRA_WARNINGS
2856     push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2857                         ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2858                         0, "NDB Binlog: Removed trailing event",
2859                         "NDB");
2860 #endif
2861   }
2862 
2863   DBUG_RETURN(0);
2864 }
2865 
is_ndb_compatible_type(Field * field)2866 inline int is_ndb_compatible_type(Field *field)
2867 {
2868   return
2869     !(field->flags & BLOB_FLAG) &&
2870     field->type() != MYSQL_TYPE_BIT &&
2871     field->pack_length() != 0;
2872 }
2873 
2874 /*
2875   - create eventOperations for receiving log events
2876   - setup ndb recattrs for reception of log event data
2877   - "start" the event operation
2878 
2879   used at create/discover of tables
2880 */
2881 int
ndbcluster_create_event_ops(NDB_SHARE * share,const NDBTAB * ndbtab,const char * event_name)2882 ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
2883                             const char *event_name)
2884 {
2885   THD *thd= current_thd;
2886   /*
2887     we are in either create table or rename table so table should be
2888     locked, hence we can work with the share without locks
2889   */
2890 
2891   DBUG_ENTER("ndbcluster_create_event_ops");
2892   DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
2893   DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
2894 
2895   DBUG_ASSERT(share != 0);
2896 
2897   if (share->flags & NSF_NO_BINLOG)
2898   {
2899     DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
2900                         share->flags));
2901     DBUG_RETURN(0);
2902   }
2903 
2904   int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
2905   if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2906       strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
2907     do_ndb_schema_share= 1;
2908   else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2909            strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
2910     do_ndb_apply_status_share= 1;
2911   else if (!binlog_filter->db_ok(share->db) || !ndb_binlog_running)
2912   {
2913     share->flags|= NSF_NO_BINLOG;
2914     DBUG_RETURN(0);
2915   }
2916 
2917   if (share->op)
2918   {
2919     assert(share->op->getCustomData() == (void *) share);
2920 
2921     DBUG_ASSERT(share->use_count > 1);
2922     sql_print_error("NDB Binlog: discover reusing old ev op");
2923     /* ndb_share reference ToDo free */
2924     DBUG_PRINT("NDB_SHARE", ("%s ToDo free  use_count: %u",
2925                              share->key, share->use_count));
2926     free_share(&share); // old event op already has reference
2927     DBUG_RETURN(0);
2928   }
2929 
2930   TABLE *table= share->table;
2931 
2932   int retries= 100;
2933   /*
2934     100 milliseconds, temporary error on schema operation can
2935     take some time to be resolved
2936   */
2937   int retry_sleep= 100;
2938   while (1)
2939   {
2940     mysql_mutex_lock(&injector_mutex);
2941     Ndb *ndb= injector_ndb;
2942     if (do_ndb_schema_share)
2943       ndb= schema_ndb;
2944 
2945     if (ndb == 0)
2946     {
2947       mysql_mutex_unlock(&injector_mutex);
2948       DBUG_RETURN(-1);
2949     }
2950 
2951     NdbEventOperation* op;
2952     if (do_ndb_schema_share)
2953       op= ndb->createEventOperation(event_name);
2954     else
2955     {
2956       // set injector_ndb database/schema from table internal name
2957       int ret= ndb->setDatabaseAndSchemaName(ndbtab);
2958       assert(ret == 0);
2959       op= ndb->createEventOperation(event_name);
2960       // reset to catch errors
2961       ndb->setDatabaseName("");
2962     }
2963     if (!op)
2964     {
2965       sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
2966                       " %s",event_name);
2967       push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2968                           ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2969                           ndb->getNdbError().code,
2970                           ndb->getNdbError().message,
2971                           "NDB");
2972       mysql_mutex_unlock(&injector_mutex);
2973       DBUG_RETURN(-1);
2974     }
2975 
2976     if (share->flags & NSF_BLOB_FLAG)
2977       op->mergeEvents(TRUE); // currently not inherited from event
2978 
2979     DBUG_PRINT("info", ("share->ndb_value[0]: 0x%lx  share->ndb_value[1]: 0x%lx",
2980                         (long) share->ndb_value[0],
2981                         (long) share->ndb_value[1]));
2982     int n_columns= ndbtab->getNoOfColumns();
2983     int n_fields= table ? table->s->fields : 0; // XXX ???
2984     for (int j= 0; j < n_columns; j++)
2985     {
2986       const char *col_name= ndbtab->getColumn(j)->getName();
2987       NdbValue attr0, attr1;
2988       if (j < n_fields)
2989       {
2990         Field *f= share->table->field[j];
2991         if (is_ndb_compatible_type(f))
2992         {
2993           DBUG_PRINT("info", ("%s compatible", col_name));
2994           attr0.rec= op->getValue(col_name, (char*) f->ptr);
2995           attr1.rec= op->getPreValue(col_name,
2996                                      (f->ptr - share->table->record[0]) +
2997                                      (char*) share->table->record[1]);
2998         }
2999         else if (! (f->flags & BLOB_FLAG))
3000         {
3001           DBUG_PRINT("info", ("%s non compatible", col_name));
3002           attr0.rec= op->getValue(col_name);
3003           attr1.rec= op->getPreValue(col_name);
3004         }
3005         else
3006         {
3007           DBUG_PRINT("info", ("%s blob", col_name));
3008           DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
3009           attr0.blob= op->getBlobHandle(col_name);
3010           attr1.blob= op->getPreBlobHandle(col_name);
3011           if (attr0.blob == NULL || attr1.blob == NULL)
3012           {
3013             sql_print_error("NDB Binlog: Creating NdbEventOperation"
3014                             " blob field %u handles failed (code=%d) for %s",
3015                             j, op->getNdbError().code, event_name);
3016             push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
3017                                 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3018                                 op->getNdbError().code,
3019                                 op->getNdbError().message,
3020                                 "NDB");
3021             ndb->dropEventOperation(op);
3022             mysql_mutex_unlock(&injector_mutex);
3023             DBUG_RETURN(-1);
3024           }
3025         }
3026       }
3027       else
3028       {
3029         DBUG_PRINT("info", ("%s hidden key", col_name));
3030         attr0.rec= op->getValue(col_name);
3031         attr1.rec= op->getPreValue(col_name);
3032       }
3033       share->ndb_value[0][j].ptr= attr0.ptr;
3034       share->ndb_value[1][j].ptr= attr1.ptr;
3035       DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%lx  "
3036                           "share->ndb_value[0][%d]: 0x%lx",
3037                           j, (long) &share->ndb_value[0][j],
3038                           j, (long) attr0.ptr));
3039       DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%lx  "
3040                           "share->ndb_value[1][%d]: 0x%lx",
3041                           j, (long) &share->ndb_value[0][j],
3042                           j, (long) attr1.ptr));
3043     }
3044     op->setCustomData((void *) share); // set before execute
3045     share->op= op; // assign op in NDB_SHARE
3046     if (op->execute())
3047     {
3048       share->op= NULL;
3049       retries--;
3050       if (op->getNdbError().status != NdbError::TemporaryError &&
3051           op->getNdbError().code != 1407)
3052         retries= 0;
3053       if (retries == 0)
3054       {
3055         push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
3056                             ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3057                             op->getNdbError().code, op->getNdbError().message,
3058                             "NDB");
3059         sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
3060                         event_name,
3061                         op->getNdbError().code, op->getNdbError().message);
3062       }
3063       ndb->dropEventOperation(op);
3064       mysql_mutex_unlock(&injector_mutex);
3065       if (retries)
3066       {
3067         my_sleep(retry_sleep);
3068         continue;
3069       }
3070       DBUG_RETURN(-1);
3071     }
3072     mysql_mutex_unlock(&injector_mutex);
3073     break;
3074   }
3075 
3076   /* ndb_share reference binlog */
3077   get_share(share);
3078   DBUG_PRINT("NDB_SHARE", ("%s binlog  use_count: %u",
3079                            share->key, share->use_count));
3080   if (do_ndb_apply_status_share)
3081   {
3082     /* ndb_share reference binlog extra */
3083     ndb_apply_status_share= get_share(share);
3084     DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
3085                              share->key, share->use_count));
3086     mysql_cond_signal(&injector_cond);
3087   }
3088   else if (do_ndb_schema_share)
3089   {
3090     /* ndb_share reference binlog extra */
3091     ndb_schema_share= get_share(share);
3092     DBUG_PRINT("NDB_SHARE", ("%s binlog extra  use_count: %u",
3093                              share->key, share->use_count));
3094     mysql_cond_signal(&injector_cond);
3095   }
3096 
3097   DBUG_PRINT("info",("%s share->op: 0x%lx  share->use_count: %u",
3098                      share->key, (long) share->op, share->use_count));
3099 
3100   if (opt_ndb_extra_logging)
3101     sql_print_information("NDB Binlog: logging %s", share->key);
3102   DBUG_RETURN(0);
3103 }
3104 
3105 /*
3106   when entering the calling thread should have a share lock id share != 0
3107   then the injector thread will have  one as well, i.e. share->use_count == 0
3108   (unless it has already dropped... then share->op == 0)
3109 */
3110 int
ndbcluster_handle_drop_table(Ndb * ndb,const char * event_name,NDB_SHARE * share,const char * type_str)3111 ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
3112                              NDB_SHARE *share, const char *type_str)
3113 {
3114   DBUG_ENTER("ndbcluster_handle_drop_table");
3115   THD *thd= current_thd;
3116 
3117   NDBDICT *dict= ndb->getDictionary();
3118   if (event_name && dict->dropEvent(event_name))
3119   {
3120     if (dict->getNdbError().code != 4710)
3121     {
3122       /* drop event failed for some reason, issue a warning */
3123       push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
3124                           ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3125                           dict->getNdbError().code,
3126                           dict->getNdbError().message, "NDB");
3127       /* error is not that the event did not exist */
3128       sql_print_error("NDB Binlog: Unable to drop event in database. "
3129                       "Event: %s Error Code: %d Message: %s",
3130                       event_name,
3131                       dict->getNdbError().code,
3132                       dict->getNdbError().message);
3133       /* ToDo; handle error? */
3134       if (share && share->op &&
3135           share->op->getState() == NdbEventOperation::EO_EXECUTING &&
3136           dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
3137       {
3138         DBUG_ASSERT(FALSE);
3139         DBUG_RETURN(-1);
3140       }
3141     }
3142   }
3143 
3144   if (share == 0 || share->op == 0)
3145   {
3146     DBUG_RETURN(0);
3147   }
3148 
3149 /*
3150   Syncronized drop between client thread and injector thread is
3151   neccessary in order to maintain ordering in the binlog,
3152   such that the drop occurs _after_ any inserts/updates/deletes.
3153 
3154   The penalty for this is that the drop table becomes slow.
3155 
3156   This wait is however not strictly neccessary to produce a binlog
3157   that is usable.  However the slave does not currently handle
3158   these out of order, thus we are keeping the SYNC_DROP_ defined
3159   for now.
3160 */
3161   const char *save_proc_info= thd->proc_info;
3162 #define SYNC_DROP_
3163 #ifdef SYNC_DROP_
3164   thd->proc_info= "Syncing ndb table schema operation and binlog";
3165   mysql_mutex_lock(&share->mutex);
3166   int max_timeout= DEFAULT_SYNC_TIMEOUT;
3167   while (share->op)
3168   {
3169     struct timespec abstime;
3170     set_timespec(abstime, 1);
3171     int ret= mysql_cond_timedwait(&injector_cond,
3172                                   &share->mutex,
3173                                   &abstime);
3174     if (thd->killed ||
3175         share->op == 0)
3176       break;
3177     if (ret)
3178     {
3179       max_timeout--;
3180       if (max_timeout == 0)
3181       {
3182         sql_print_error("NDB %s: %s timed out. Ignoring...",
3183                         type_str, share->key);
3184         break;
3185       }
3186       if (opt_ndb_extra_logging)
3187         ndb_report_waiting(type_str, max_timeout,
3188                            type_str, share->key);
3189     }
3190   }
3191   mysql_mutex_unlock(&share->mutex);
3192 #else
3193   mysql_mutex_lock(&share->mutex);
3194   share->op_old= share->op;
3195   share->op= 0;
3196   mysql_mutex_unlock(&share->mutex);
3197 #endif
3198   thd->proc_info= save_proc_info;
3199 
3200   DBUG_RETURN(0);
3201 }
3202 
3203 
3204 /********************************************************************
3205   Internal helper functions for differentd events from the stoarage nodes
3206   used by the ndb injector thread
3207 ********************************************************************/
3208 
3209 /*
3210   Handle error states on events from the storage nodes
3211 */
ndb_binlog_thread_handle_error(Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row & row)3212 static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
3213                                           ndb_binlog_index_row &row)
3214 {
3215   NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
3216   DBUG_ENTER("ndb_binlog_thread_handle_error");
3217 
3218   int overrun= pOp->isOverrun();
3219   if (overrun)
3220   {
3221     /*
3222       ToDo: this error should rather clear the ndb_binlog_index...
3223       and continue
3224     */
3225     sql_print_error("NDB Binlog: Overrun in event buffer, "
3226                     "this means we have dropped events. Cannot "
3227                     "continue binlog for %s", share->key);
3228     pOp->clearError();
3229     DBUG_RETURN(-1);
3230   }
3231 
3232   if (!pOp->isConsistent())
3233   {
3234     /*
3235       ToDo: this error should rather clear the ndb_binlog_index...
3236       and continue
3237     */
3238     sql_print_error("NDB Binlog: Not Consistent. Cannot "
3239                     "continue binlog for %s. Error code: %d"
3240                     " Message: %s", share->key,
3241                     pOp->getNdbError().code,
3242                     pOp->getNdbError().message);
3243     pOp->clearError();
3244     DBUG_RETURN(-1);
3245   }
3246   sql_print_error("NDB Binlog: unhandled error %d for table %s",
3247                   pOp->hasError(), share->key);
3248   pOp->clearError();
3249   DBUG_RETURN(0);
3250 }
3251 
3252 static int
ndb_binlog_thread_handle_non_data_event(THD * thd,Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row & row)3253 ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
3254                                         NdbEventOperation *pOp,
3255                                         ndb_binlog_index_row &row)
3256 {
3257   NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
3258   NDBEVENT::TableEvent type= pOp->getEventType();
3259 
3260   switch (type)
3261   {
3262   case NDBEVENT::TE_CLUSTER_FAILURE:
3263     if (opt_ndb_extra_logging)
3264       sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
3265                             share->key, (unsigned) pOp->getGCI());
3266     if (ndb_apply_status_share == share)
3267     {
3268       if (opt_ndb_extra_logging &&
3269           ndb_binlog_tables_inited && ndb_binlog_running)
3270         sql_print_information("NDB Binlog: ndb tables initially "
3271                               "read only on reconnect.");
3272       /* ndb_share reference binlog extra free */
3273       DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
3274                                share->key, share->use_count));
3275       free_share(&ndb_apply_status_share);
3276       ndb_apply_status_share= 0;
3277       ndb_binlog_tables_inited= 0;
3278     }
3279     DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
3280                         "%s  received share: 0x%lx  op: 0x%lx  share op: 0x%lx  "
3281                         "op_old: 0x%lx",
3282                          share->key, (long) share, (long) pOp,
3283                          (long) share->op, (long) share->op_old));
3284     break;
3285   case NDBEVENT::TE_DROP:
3286     if (ndb_apply_status_share == share)
3287     {
3288       if (opt_ndb_extra_logging &&
3289           ndb_binlog_tables_inited && ndb_binlog_running)
3290         sql_print_information("NDB Binlog: ndb tables initially "
3291                               "read only on reconnect.");
3292       /* ndb_share reference binlog extra free */
3293       DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
3294                                share->key, share->use_count));
3295       free_share(&ndb_apply_status_share);
3296       ndb_apply_status_share= 0;
3297       ndb_binlog_tables_inited= 0;
3298     }
3299     /* ToDo: remove printout */
3300     if (opt_ndb_extra_logging)
3301       sql_print_information("NDB Binlog: drop table %s.", share->key);
3302     // fall through
3303   case NDBEVENT::TE_ALTER:
3304     row.n_schemaops++;
3305     DBUG_PRINT("info", ("TABLE %s  EVENT: %s  received share: 0x%lx  op: 0x%lx  "
3306                         "share op: 0x%lx  op_old: 0x%lx",
3307                         type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
3308                         share->key, (long) share, (long) pOp,
3309                         (long) share->op, (long) share->op_old));
3310     break;
3311   case NDBEVENT::TE_NODE_FAILURE:
3312     /* fall through */
3313   case NDBEVENT::TE_SUBSCRIBE:
3314     /* fall through */
3315   case NDBEVENT::TE_UNSUBSCRIBE:
3316     /* ignore */
3317     return 0;
3318   default:
3319     sql_print_error("NDB Binlog: unknown non data event %d for %s. "
3320                     "Ignoring...", (unsigned) type, share->key);
3321     return 0;
3322   }
3323 
3324   ndb_handle_schema_change(thd, ndb, pOp, share);
3325   return 0;
3326 }
3327 
3328 /*
3329   Handle data events from the storage nodes
3330 */
3331 static int
ndb_binlog_thread_handle_data_event(Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row & row,injector::transaction & trans)3332 ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
3333                                     ndb_binlog_index_row &row,
3334                                     injector::transaction &trans)
3335 {
3336   NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
3337   if (share == ndb_apply_status_share)
3338     return 0;
3339 
3340   uint32 originating_server_id= pOp->getAnyValue();
3341   if (originating_server_id == 0)
3342     originating_server_id= ::server_id;
3343   else if (originating_server_id & NDB_ANYVALUE_RESERVED)
3344   {
3345     if (originating_server_id != NDB_ANYVALUE_FOR_NOLOGGING)
3346       sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
3347                         "event not logged",
3348                         originating_server_id);
3349     return 0;
3350   }
3351   else if (!g_ndb_log_slave_updates)
3352   {
3353     /*
3354       This event comes from a slave applier since it has an originating
3355       server id set. Since option to log slave updates is not set, skip it.
3356     */
3357     return 0;
3358   }
3359 
3360   TABLE *table= share->table;
3361   DBUG_ASSERT(trans.good());
3362   DBUG_ASSERT(table != 0);
3363 
3364   dbug_print_table("table", table);
3365 
3366   TABLE_SHARE *table_s= table->s;
3367   uint n_fields= table_s->fields;
3368   MY_BITMAP b;
3369   /* Potential buffer for the bitmap */
3370   uint32 bitbuf[128 / (sizeof(uint32) * 8)];
3371   bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
3372               n_fields, FALSE);
3373   bitmap_set_all(&b);
3374 
3375   /*
3376    row data is already in table->record[0]
3377    As we told the NdbEventOperation to do this
3378    (saves moving data about many times)
3379   */
3380 
3381   /*
3382     for now malloc/free blobs buffer each time
3383     TODO if possible share single permanent buffer with handlers
3384    */
3385   uchar* blobs_buffer[2] = { 0, 0 };
3386   uint blobs_buffer_size[2] = { 0, 0 };
3387 
3388   switch(pOp->getEventType())
3389   {
3390   case NDBEVENT::TE_INSERT:
3391     row.n_inserts++;
3392     DBUG_PRINT("info", ("INSERT INTO %s.%s",
3393                         table_s->db.str, table_s->table_name.str));
3394     {
3395       if (share->flags & NSF_BLOB_FLAG)
3396       {
3397         my_ptrdiff_t ptrdiff= 0;
3398         int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[0],
3399                                                blobs_buffer[0],
3400                                                blobs_buffer_size[0],
3401                                                ptrdiff);
3402         DBUG_ASSERT(ret == 0);
3403       }
3404       ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
3405       int ret __attribute__((unused))= trans.write_row(originating_server_id,
3406                                         injector::transaction::table(table,
3407                                                                      TRUE),
3408                                         &b, n_fields, table->record[0]);
3409       DBUG_ASSERT(ret == 0);
3410     }
3411     break;
3412   case NDBEVENT::TE_DELETE:
3413     row.n_deletes++;
3414     DBUG_PRINT("info",("DELETE FROM %s.%s",
3415                        table_s->db.str, table_s->table_name.str));
3416     {
3417       /*
3418         table->record[0] contains only the primary key in this case
3419         since we do not have an after image
3420       */
3421       int n;
3422       if (table->s->primary_key != MAX_KEY)
3423         n= 0; /*
3424                 use the primary key only as it save time and space and
3425                 it is the only thing needed to log the delete
3426               */
3427       else
3428         n= 1; /*
3429                 we use the before values since we don't have a primary key
3430                 since the mysql server does not handle the hidden primary
3431                 key
3432               */
3433 
3434       if (share->flags & NSF_BLOB_FLAG)
3435       {
3436         my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
3437         int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[n],
3438                                                blobs_buffer[n],
3439                                                blobs_buffer_size[n],
3440                                                ptrdiff);
3441         DBUG_ASSERT(ret == 0);
3442       }
3443       ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
3444       DBUG_EXECUTE("info", print_records(table, table->record[n]););
3445       int ret __attribute__((unused))= trans.delete_row(originating_server_id,
3446                                           injector::transaction::table(table,
3447                                                                        TRUE),
3448                                           &b, n_fields, table->record[n]);
3449       DBUG_ASSERT(ret == 0);
3450     }
3451     break;
3452   case NDBEVENT::TE_UPDATE:
3453     row.n_updates++;
3454     DBUG_PRINT("info", ("UPDATE %s.%s",
3455                         table_s->db.str, table_s->table_name.str));
3456     {
3457       if (share->flags & NSF_BLOB_FLAG)
3458       {
3459         my_ptrdiff_t ptrdiff= 0;
3460         int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[0],
3461                                                blobs_buffer[0],
3462                                                blobs_buffer_size[0],
3463                                                ptrdiff);
3464         DBUG_ASSERT(ret == 0);
3465       }
3466       ndb_unpack_record(table, share->ndb_value[0],
3467                         &b, table->record[0]);
3468       DBUG_EXECUTE("info", print_records(table, table->record[0]););
3469       if (table->s->primary_key != MAX_KEY)
3470       {
3471         /*
3472           since table has a primary key, we can do a write
3473           using only after values
3474         */
3475         trans.write_row(originating_server_id,
3476                         injector::transaction::table(table, TRUE),
3477                         &b, n_fields, table->record[0]);// after values
3478       }
3479       else
3480       {
3481         /*
3482           mysql server cannot handle the ndb hidden key and
3483           therefore needs the before image as well
3484         */
3485         if (share->flags & NSF_BLOB_FLAG)
3486         {
3487           my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
3488           int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[1],
3489                                                  blobs_buffer[1],
3490                                                  blobs_buffer_size[1],
3491                                                  ptrdiff);
3492           DBUG_ASSERT(ret == 0);
3493         }
3494         ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
3495         DBUG_EXECUTE("info", print_records(table, table->record[1]););
3496         int ret __attribute__((unused))= trans.update_row(originating_server_id,
3497                                             injector::transaction::table(table,
3498                                                                          TRUE),
3499                                             &b, n_fields,
3500                                             table->record[1], // before values
3501                                             table->record[0]);// after values
3502         DBUG_ASSERT(ret == 0);
3503       }
3504     }
3505     break;
3506   default:
3507     /* We should REALLY never get here. */
3508     DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
3509     break;
3510   }
3511 
3512   if (share->flags & NSF_BLOB_FLAG)
3513   {
3514     my_free(blobs_buffer[0]);
3515     my_free(blobs_buffer[1]);
3516   }
3517 
3518   return 0;
3519 }
3520 
3521 //#define RUN_NDB_BINLOG_TIMER
3522 #ifdef RUN_NDB_BINLOG_TIMER
3523 class Timer
3524 {
3525 public:
Timer()3526   Timer() { start(); }
start()3527   void start() { gettimeofday(&m_start, 0); }
stop()3528   void stop() { gettimeofday(&m_stop, 0); }
elapsed_ms()3529   ulong elapsed_ms()
3530   {
3531     return (ulong)
3532       (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
3533        ((longlong) m_stop.tv_usec -
3534         (longlong) m_start.tv_usec + 999) / 1000);
3535   }
3536 private:
3537   struct timeval m_start,m_stop;
3538 };
3539 #endif
3540 
3541 /****************************************************************
3542   Injector thread main loop
3543 ****************************************************************/
3544 
3545 static uchar *
ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT * schema_object,size_t * length,my_bool not_used)3546 ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
3547                            size_t *length,
3548                            my_bool not_used __attribute__((unused)))
3549 {
3550   *length= schema_object->key_length;
3551   return (uchar*) schema_object->key;
3552 }
3553 
ndb_get_schema_object(const char * key,my_bool create_if_not_exists,my_bool have_lock)3554 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
3555                                                 my_bool create_if_not_exists,
3556                                                 my_bool have_lock)
3557 {
3558   NDB_SCHEMA_OBJECT *ndb_schema_object;
3559   uint length= (uint) strlen(key);
3560   DBUG_ENTER("ndb_get_schema_object");
3561   DBUG_PRINT("enter", ("key: '%s'", key));
3562 
3563   if (!have_lock)
3564     mysql_mutex_lock(&ndbcluster_mutex);
3565   while (!(ndb_schema_object=
3566            (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
3567                                                (uchar*) key,
3568                                                length)))
3569   {
3570     if (!create_if_not_exists)
3571     {
3572       DBUG_PRINT("info", ("does not exist"));
3573       break;
3574     }
3575     if (!(ndb_schema_object=
3576           (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1,
3577                                          MYF(MY_WME | MY_ZEROFILL))))
3578     {
3579       DBUG_PRINT("info", ("malloc error"));
3580       break;
3581     }
3582     ndb_schema_object->key= (char *)(ndb_schema_object+1);
3583     memcpy(ndb_schema_object->key, key, length + 1);
3584     ndb_schema_object->key_length= length;
3585     if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
3586     {
3587       my_free(ndb_schema_object);
3588       break;
3589     }
3590     mysql_mutex_init(key_ndb_schema_object_mutex, &ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
3591     bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
3592                 sizeof(ndb_schema_object->slock)*8, FALSE);
3593     bitmap_clear_all(&ndb_schema_object->slock_bitmap);
3594     break;
3595   }
3596   if (ndb_schema_object)
3597   {
3598     ndb_schema_object->use_count++;
3599     DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
3600   }
3601   if (!have_lock)
3602     mysql_mutex_unlock(&ndbcluster_mutex);
3603   DBUG_RETURN(ndb_schema_object);
3604 }
3605 
3606 
ndb_free_schema_object(NDB_SCHEMA_OBJECT ** ndb_schema_object,bool have_lock)3607 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
3608                                    bool have_lock)
3609 {
3610   DBUG_ENTER("ndb_free_schema_object");
3611   DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
3612   if (!have_lock)
3613     mysql_mutex_lock(&ndbcluster_mutex);
3614   if (!--(*ndb_schema_object)->use_count)
3615   {
3616     DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
3617     my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
3618     mysql_mutex_destroy(&(*ndb_schema_object)->mutex);
3619     my_free(*ndb_schema_object);
3620     *ndb_schema_object= 0;
3621   }
3622   else
3623   {
3624     DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
3625   }
3626   if (!have_lock)
3627     mysql_mutex_unlock(&ndbcluster_mutex);
3628   DBUG_VOID_RETURN;
3629 }
3630 
3631 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
3632 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
3633 
ndb_binlog_thread_func(void * arg)3634 pthread_handler_t ndb_binlog_thread_func(void *arg)
3635 {
3636   THD *thd; /* needs to be first for thread_stack */
3637   Ndb *i_ndb= 0;
3638   Ndb *s_ndb= 0;
3639   Thd_ndb *thd_ndb=0;
3640   int ndb_update_ndb_binlog_index= 1;
3641   injector *inj= injector::instance();
3642   uint incident_id= 0;
3643 
3644 #ifdef RUN_NDB_BINLOG_TIMER
3645   Timer main_timer;
3646 #endif
3647 
3648   mysql_mutex_lock(&injector_mutex);
3649   /*
3650     Set up the Thread
3651   */
3652   my_thread_init();
3653   DBUG_ENTER("ndb_binlog_thread");
3654 
3655   thd= new THD; /* note that contructor of THD uses DBUG_ */
3656   THD_CHECK_SENTRY(thd);
3657   thd->set_current_stmt_binlog_format_row();
3658 
3659   /* We need to set thd->thread_id before thd->store_globals, or it will
3660      set an invalid value for thd->variables.pseudo_thread_id.
3661   */
3662   mysql_mutex_lock(&LOCK_thread_count);
3663   thd->thread_id= thread_id++;
3664   mysql_mutex_unlock(&LOCK_thread_count);
3665 
3666   mysql_thread_set_psi_id(thd->thread_id);
3667 
3668   thd->thread_stack= (char*) &thd; /* remember where our stack is */
3669   if (thd->store_globals())
3670   {
3671     thd->cleanup();
3672     delete thd;
3673     ndb_binlog_thread_running= -1;
3674     mysql_mutex_unlock(&injector_mutex);
3675     mysql_cond_signal(&injector_cond);
3676 
3677     DBUG_LEAVE;                               // Must match DBUG_ENTER()
3678     my_thread_end();
3679     pthread_exit(0);
3680     return NULL;                              // Avoid compiler warnings
3681   }
3682 
3683   thd->init_for_queries();
3684   thd->command= COM_DAEMON;
3685   thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
3686   thd->main_security_ctx.host_or_ip= "";
3687   thd->client_capabilities= 0;
3688   my_net_init(&thd->net, 0);
3689   thd->main_security_ctx.master_access= ~0;
3690   thd->main_security_ctx.priv_user[0]= 0;
3691   /* Do not use user-supplied timeout value for system threads. */
3692   thd->variables.lock_wait_timeout= LONG_TIMEOUT;
3693 
3694   /*
3695     Set up ndb binlog
3696   */
3697   sql_print_information("Starting MySQL Cluster Binlog Thread");
3698 
3699   pthread_detach_this_thread();
3700   thd->real_id= pthread_self();
3701   mysql_mutex_lock(&LOCK_thread_count);
3702   threads.append(thd);
3703   mysql_mutex_unlock(&LOCK_thread_count);
3704   thd->lex->start_transaction_opt= 0;
3705 
3706   if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
3707       s_ndb->init())
3708   {
3709     sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
3710     ndb_binlog_thread_running= -1;
3711     mysql_mutex_unlock(&injector_mutex);
3712     mysql_cond_signal(&injector_cond);
3713     goto err;
3714   }
3715 
3716   // empty database
3717   if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
3718       i_ndb->init())
3719   {
3720     sql_print_error("NDB Binlog: Getting Ndb object failed");
3721     ndb_binlog_thread_running= -1;
3722     mysql_mutex_unlock(&injector_mutex);
3723     mysql_cond_signal(&injector_cond);
3724     goto err;
3725   }
3726 
3727   /* init hash for schema object distribution */
3728   (void) my_hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
3729                    (my_hash_get_key)ndb_schema_objects_get_key, 0, 0);
3730 
3731   /*
3732     Expose global reference to our ndb object.
3733 
3734     Used by both sql client thread and binlog thread to interact
3735     with the storage
3736     mysql_mutex_lock(&injector_mutex);
3737   */
3738   injector_thd= thd;
3739   injector_ndb= i_ndb;
3740   p_latest_trans_gci=
3741     injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
3742   schema_ndb= s_ndb;
3743 
3744   if (opt_bin_log)
3745   {
3746     ndb_binlog_running= TRUE;
3747   }
3748 
3749   /* Thread start up completed  */
3750   ndb_binlog_thread_running= 1;
3751   mysql_mutex_unlock(&injector_mutex);
3752   mysql_cond_signal(&injector_cond);
3753 
3754   /*
3755     wait for mysql server to start (so that the binlog is started
3756     and thus can receive the first GAP event)
3757   */
3758   mysql_mutex_lock(&LOCK_server_started);
3759   while (!mysqld_server_started)
3760   {
3761     struct timespec abstime;
3762     set_timespec(abstime, 1);
3763     mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
3764                          &abstime);
3765     if (ndbcluster_terminating)
3766     {
3767       mysql_mutex_unlock(&LOCK_server_started);
3768       goto err;
3769     }
3770   }
3771   mysql_mutex_unlock(&LOCK_server_started);
3772 restart:
3773   /*
3774     Main NDB Injector loop
3775   */
3776   while (ndb_binlog_running)
3777   {
3778     /*
3779       check if it is the first log, if so we do not insert a GAP event
3780       as there is really no log to have a GAP in
3781     */
3782     if (incident_id == 0)
3783     {
3784       LOG_INFO log_info;
3785       mysql_bin_log.get_current_log(&log_info);
3786       int len=  strlen(log_info.log_file_name);
3787       uint no= 0;
3788       if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
3789           no == 1)
3790       {
3791         /* this is the fist log, so skip GAP event */
3792         break;
3793       }
3794     }
3795 
3796     /*
3797       Always insert a GAP event as we cannot know what has happened
3798       in the cluster while not being connected.
3799     */
3800     LEX_STRING const msg[2]=
3801       {
3802         { C_STRING_WITH_LEN("mysqld startup")    },
3803         { C_STRING_WITH_LEN("cluster disconnect")}
3804       };
3805     int error __attribute__((unused))=
3806       inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg[incident_id]);
3807     DBUG_ASSERT(!error);
3808     break;
3809   }
3810   incident_id= 1;
3811   {
3812     thd->proc_info= "Waiting for ndbcluster to start";
3813 
3814     mysql_mutex_lock(&injector_mutex);
3815     while (!ndb_schema_share ||
3816            (ndb_binlog_running && !ndb_apply_status_share))
3817     {
3818       /* ndb not connected yet */
3819       struct timespec abstime;
3820       set_timespec(abstime, 1);
3821       mysql_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
3822       if (ndbcluster_binlog_terminating)
3823       {
3824         mysql_mutex_unlock(&injector_mutex);
3825         goto err;
3826       }
3827     }
3828     mysql_mutex_unlock(&injector_mutex);
3829 
3830     if (thd_ndb == NULL)
3831     {
3832       DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
3833       if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
3834       {
3835         sql_print_error("Could not allocate Thd_ndb object");
3836         goto err;
3837       }
3838       set_thd_ndb(thd, thd_ndb);
3839       thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
3840       thd->query_id= 0; // to keep valgrind quiet
3841     }
3842   }
3843 
3844   {
3845     // wait for the first event
3846     thd->proc_info= "Waiting for first event from ndbcluster";
3847     int schema_res, res;
3848     Uint64 schema_gci;
3849     do
3850     {
3851       DBUG_PRINT("info", ("Waiting for the first event"));
3852 
3853       if (ndbcluster_binlog_terminating)
3854         goto err;
3855 
3856       schema_res= s_ndb->pollEvents(100, &schema_gci);
3857     } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
3858     if (ndb_binlog_running)
3859     {
3860       Uint64 gci= i_ndb->getLatestGCI();
3861       while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
3862       {
3863         if (ndbcluster_binlog_terminating)
3864           goto err;
3865         res= i_ndb->pollEvents(10, &gci);
3866       }
3867       if (gci > schema_gci)
3868       {
3869         schema_gci= gci;
3870       }
3871     }
3872     // now check that we have epochs consistant with what we had before the restart
3873     DBUG_PRINT("info", ("schema_res: %d  schema_gci: %lu", schema_res,
3874                         (long) schema_gci));
3875     {
3876       i_ndb->flushIncompleteEvents(schema_gci);
3877       s_ndb->flushIncompleteEvents(schema_gci);
3878       if (schema_gci < ndb_latest_handled_binlog_epoch)
3879       {
3880         sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
3881                         "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
3882                         "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
3883                         (unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci);
3884         *p_latest_trans_gci= 0;
3885         ndb_latest_handled_binlog_epoch= 0;
3886         ndb_latest_applied_binlog_epoch= 0;
3887         ndb_latest_received_binlog_epoch= 0;
3888       }
3889       else if (ndb_latest_applied_binlog_epoch > 0)
3890       {
3891         sql_print_warning("NDB Binlog: cluster has reconnected. "
3892                           "Changes to the database that occured while "
3893                           "disconnected will not be in the binlog");
3894       }
3895       if (opt_ndb_extra_logging)
3896       {
3897         sql_print_information("NDB Binlog: starting log at epoch %u",
3898                               (unsigned)schema_gci);
3899       }
3900     }
3901   }
3902   {
3903     static char db[]= "";
3904     thd->db= db;
3905   }
3906   do_ndbcluster_binlog_close_connection= BCCC_running;
3907   for ( ; !((ndbcluster_binlog_terminating ||
3908              do_ndbcluster_binlog_close_connection) &&
3909             ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
3910           do_ndbcluster_binlog_close_connection != BCCC_restart; )
3911   {
3912 #ifndef DBUG_OFF
3913     if (do_ndbcluster_binlog_close_connection)
3914     {
3915       DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
3916                           "ndb_latest_handled_binlog_epoch: %lu, "
3917                           "*p_latest_trans_gci: %lu",
3918                           do_ndbcluster_binlog_close_connection,
3919                           (ulong) ndb_latest_handled_binlog_epoch,
3920                           (ulong) *p_latest_trans_gci));
3921     }
3922 #endif
3923 #ifdef RUN_NDB_BINLOG_TIMER
3924     main_timer.stop();
3925     sql_print_information("main_timer %ld ms",  main_timer.elapsed_ms());
3926     main_timer.start();
3927 #endif
3928 
3929     /*
3930       now we don't want any events before next gci is complete
3931     */
3932     thd->proc_info= "Waiting for event from ndbcluster";
3933     thd->set_time();
3934 
3935     /* wait for event or 1000 ms */
3936     Uint64 gci= 0, schema_gci;
3937     int res= 0, tot_poll_wait= 1000;
3938     if (ndb_binlog_running)
3939     {
3940       res= i_ndb->pollEvents(tot_poll_wait, &gci);
3941       tot_poll_wait= 0;
3942     }
3943     else
3944     {
3945       /*
3946         Just consume any events, not used if no binlogging
3947         e.g. node failure events
3948       */
3949       Uint64 tmp_gci;
3950       if (i_ndb->pollEvents(0, &tmp_gci))
3951         while (i_ndb->nextEvent())
3952           ;
3953     }
3954     int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
3955     ndb_latest_received_binlog_epoch= gci;
3956 
3957     while (gci > schema_gci && schema_res >= 0)
3958     {
3959       static char buf[64];
3960       thd->proc_info= "Waiting for schema epoch";
3961       my_snprintf(buf, sizeof(buf), "%s %u(%u)", thd->proc_info, (unsigned) schema_gci, (unsigned) gci);
3962       thd->proc_info= buf;
3963       schema_res= s_ndb->pollEvents(10, &schema_gci);
3964     }
3965 
3966     if ((ndbcluster_binlog_terminating ||
3967          do_ndbcluster_binlog_close_connection) &&
3968         (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
3969          !ndb_binlog_running))
3970       break; /* Shutting down server */
3971 
3972     if (ndb_binlog_index && ndb_binlog_index->s->has_old_version())
3973     {
3974       if (ndb_binlog_index->s->has_old_version())
3975       {
3976         trans_commit_stmt(thd);
3977         close_thread_tables(thd);
3978         thd->mdl_context.release_transactional_locks();
3979         ndb_binlog_index= 0;
3980       }
3981     }
3982 
3983     MEM_ROOT **root_ptr=
3984       my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
3985     MEM_ROOT *old_root= *root_ptr;
3986     MEM_ROOT mem_root;
3987     init_sql_alloc(&mem_root, 4096, 0);
3988     List<Cluster_schema> post_epoch_log_list;
3989     List<Cluster_schema> post_epoch_unlock_list;
3990     *root_ptr= &mem_root;
3991 
3992     if (unlikely(schema_res > 0))
3993     {
3994       thd->proc_info= "Processing events from schema table";
3995       s_ndb->
3996         setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
3997       s_ndb->
3998         setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
3999       NdbEventOperation *pOp= s_ndb->nextEvent();
4000       while (pOp != NULL)
4001       {
4002         if (!pOp->hasError())
4003         {
4004           ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
4005                                                 &post_epoch_log_list,
4006                                                 &post_epoch_unlock_list,
4007                                                 &mem_root);
4008           DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
4009                               s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4010                               "<empty>"));
4011           DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
4012                               i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4013                               "<empty>"));
4014           if (i_ndb->getEventOperation() == NULL &&
4015               s_ndb->getEventOperation() == NULL &&
4016               do_ndbcluster_binlog_close_connection == BCCC_running)
4017           {
4018             DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4019             do_ndbcluster_binlog_close_connection= BCCC_restart;
4020             if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4021             {
4022               sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4023                               "as latest received epoch is %lu",
4024                               (ulong) *p_latest_trans_gci,
4025                               (ulong) ndb_latest_received_binlog_epoch);
4026             }
4027           }
4028         }
4029         else
4030           sql_print_error("NDB: error %lu (%s) on handling "
4031                           "binlog schema event",
4032                           (ulong) pOp->getNdbError().code,
4033                           pOp->getNdbError().message);
4034         pOp= s_ndb->nextEvent();
4035       }
4036     }
4037 
4038     if (res > 0)
4039     {
4040       DBUG_PRINT("info", ("pollEvents res: %d", res));
4041       thd->proc_info= "Processing events";
4042       NdbEventOperation *pOp= i_ndb->nextEvent();
4043       ndb_binlog_index_row row;
4044       while (pOp != NULL)
4045       {
4046 #ifdef RUN_NDB_BINLOG_TIMER
4047         Timer gci_timer, write_timer;
4048         int event_count= 0;
4049         gci_timer.start();
4050 #endif
4051         gci= pOp->getGCI();
4052         DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci));
4053         // sometimes get TE_ALTER with invalid table
4054         DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
4055                     ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
4056         DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
4057 
4058         /* initialize some variables for this epoch */
4059         g_ndb_log_slave_updates= opt_log_slave_updates;
4060         i_ndb->
4061           setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
4062         i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
4063 
4064         bzero((char*) &row, sizeof(row));
4065         thd->variables.character_set_client= &my_charset_latin1;
4066         injector::transaction trans;
4067         // pass table map before epoch
4068         {
4069           Uint32 iter= 0;
4070           const NdbEventOperation *gci_op;
4071           Uint32 event_types;
4072           while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4073                  != NULL)
4074           {
4075             NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
4076             DBUG_PRINT("info", ("per gci_op: 0x%lx  share: 0x%lx  event_types: 0x%x",
4077                                 (long) gci_op, (long) share, event_types));
4078             // workaround for interface returning TE_STOP events
4079             // which are normally filtered out below in the nextEvent loop
4080             if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
4081             {
4082               DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
4083                                   gci_op->getEvent()->getTable()->getName()));
4084               continue;
4085             }
4086             // this should not happen
4087             if (share == NULL || share->table == NULL)
4088             {
4089               DBUG_PRINT("info", ("no share or table %s!",
4090                                   gci_op->getEvent()->getTable()->getName()));
4091               continue;
4092             }
4093             if (share == ndb_apply_status_share)
4094             {
4095               // skip this table, it is handled specially
4096               continue;
4097             }
4098             TABLE *table= share->table;
4099 #ifndef DBUG_OFF
4100             const LEX_STRING &name= table->s->table_name;
4101 #endif
4102             if ((event_types & (NdbDictionary::Event::TE_INSERT |
4103                                 NdbDictionary::Event::TE_UPDATE |
4104                                 NdbDictionary::Event::TE_DELETE)) == 0)
4105             {
4106               DBUG_PRINT("info", ("skipping non data event table: %.*s",
4107                                   (int) name.length, name.str));
4108               continue;
4109             }
4110             if (!trans.good())
4111             {
4112               DBUG_PRINT("info",
4113                          ("Found new data event, initializing transaction"));
4114               inj->new_trans(thd, &trans);
4115             }
4116             DBUG_PRINT("info", ("use_table: %.*s",
4117                                 (int) name.length, name.str));
4118             injector::transaction::table tbl(table, TRUE);
4119             int ret __attribute__((unused))= trans.use_table(::server_id, tbl);
4120             DBUG_ASSERT(ret == 0);
4121           }
4122         }
4123         if (trans.good())
4124         {
4125           if (ndb_apply_status_share)
4126           {
4127             TABLE *table= ndb_apply_status_share->table;
4128 
4129 #ifndef DBUG_OFF
4130             const LEX_STRING& name= table->s->table_name;
4131             DBUG_PRINT("info", ("use_table: %.*s",
4132                                 (int) name.length, name.str));
4133 #endif
4134             injector::transaction::table tbl(table, TRUE);
4135             int ret __attribute__((unused))= trans.use_table(::server_id, tbl);
4136             DBUG_ASSERT(ret == 0);
4137 
4138 	    /*
4139 	       Intialize table->record[0]
4140 	    */
4141 	    empty_record(table);
4142 
4143             table->field[0]->store((longlong)::server_id);
4144             table->field[1]->store((longlong)gci);
4145             table->field[2]->store("", 0, &my_charset_bin);
4146             table->field[3]->store((longlong)0);
4147             table->field[4]->store((longlong)0);
4148             trans.write_row(::server_id,
4149                             injector::transaction::table(table, TRUE),
4150                             &table->s->all_set, table->s->fields,
4151                             table->record[0]);
4152           }
4153           else
4154           {
4155             sql_print_error("NDB: Could not get apply status share");
4156           }
4157         }
4158 #ifdef RUN_NDB_BINLOG_TIMER
4159         write_timer.start();
4160 #endif
4161         do
4162         {
4163 #ifdef RUN_NDB_BINLOG_TIMER
4164           event_count++;
4165 #endif
4166           if (pOp->hasError() &&
4167               ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
4168             goto err;
4169 
4170 #ifndef DBUG_OFF
4171           {
4172             NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
4173             DBUG_PRINT("info",
4174                        ("EVENT TYPE: %d  GCI: %ld  last applied: %ld  "
4175                         "share: 0x%lx (%s.%s)", pOp->getEventType(),
4176                         (long) gci,
4177                         (long) ndb_latest_applied_binlog_epoch,
4178                         (long) share,
4179                         share ? share->db :  "'NULL'",
4180                         share ? share->table_name : "'NULL'"));
4181             DBUG_ASSERT(share != 0);
4182           }
4183           // assert that there is consistancy between gci op list
4184           // and event list
4185           {
4186             Uint32 iter= 0;
4187             const NdbEventOperation *gci_op;
4188             Uint32 event_types;
4189             while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4190                    != NULL)
4191             {
4192               if (gci_op == pOp)
4193                 break;
4194             }
4195             DBUG_ASSERT(gci_op == pOp);
4196             DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
4197           }
4198 #endif
4199           if ((unsigned) pOp->getEventType() <
4200               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
4201             ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
4202           else
4203           {
4204             // set injector_ndb database/schema from table internal name
4205             int ret __attribute__((unused))=
4206               i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
4207             DBUG_ASSERT(ret == 0);
4208             ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
4209             // reset to catch errors
4210             i_ndb->setDatabaseName("");
4211             DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
4212                                 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4213                                 "<empty>"));
4214             DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
4215                                 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4216                                 "<empty>"));
4217             if (i_ndb->getEventOperation() == NULL &&
4218                 s_ndb->getEventOperation() == NULL &&
4219                 do_ndbcluster_binlog_close_connection == BCCC_running)
4220             {
4221               DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4222               do_ndbcluster_binlog_close_connection= BCCC_restart;
4223               if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4224               {
4225                 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4226                                 "as latest received epoch is %lu",
4227                                 (ulong) *p_latest_trans_gci,
4228                                 (ulong) ndb_latest_received_binlog_epoch);
4229               }
4230             }
4231           }
4232 
4233           pOp= i_ndb->nextEvent();
4234         } while (pOp && pOp->getGCI() == gci);
4235 
4236         /*
4237           note! pOp is not referring to an event in the next epoch
4238           or is == 0
4239         */
4240 #ifdef RUN_NDB_BINLOG_TIMER
4241         write_timer.stop();
4242 #endif
4243 
4244         if (trans.good())
4245         {
4246           //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
4247           thd->proc_info= "Committing events to binlog";
4248           injector::transaction::binlog_pos start= trans.start_pos();
4249           if (int r= trans.commit())
4250           {
4251             sql_print_error("NDB Binlog: "
4252                             "Error during COMMIT of GCI. Error: %d",
4253                             r);
4254             /* TODO: Further handling? */
4255           }
4256           row.gci= gci;
4257           row.master_log_file= start.file_name();
4258           row.master_log_pos= start.file_pos();
4259 
4260           DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
4261           if (ndb_update_ndb_binlog_index)
4262             ndb_add_ndb_binlog_index(thd, &row);
4263           ndb_latest_applied_binlog_epoch= gci;
4264         }
4265         ndb_latest_handled_binlog_epoch= gci;
4266 #ifdef RUN_NDB_BINLOG_TIMER
4267         gci_timer.stop();
4268         sql_print_information("gci %ld event_count %d write time "
4269                               "%ld(%d e/s), total time %ld(%d e/s)",
4270                               (ulong)gci, event_count,
4271                               write_timer.elapsed_ms(),
4272                               (1000*event_count) / write_timer.elapsed_ms(),
4273                               gci_timer.elapsed_ms(),
4274                               (1000*event_count) / gci_timer.elapsed_ms());
4275 #endif
4276       }
4277     }
4278 
4279     ndb_binlog_thread_handle_schema_event_post_epoch(thd,
4280                                                      &post_epoch_log_list,
4281                                                      &post_epoch_unlock_list);
4282     free_root(&mem_root, MYF(0));
4283     *root_ptr= old_root;
4284     ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
4285   }
4286   if (do_ndbcluster_binlog_close_connection == BCCC_restart)
4287   {
4288     ndb_binlog_tables_inited= FALSE;
4289     trans_commit_stmt(thd);
4290     close_thread_tables(thd);
4291     thd->mdl_context.release_transactional_locks();
4292     ndb_binlog_index= 0;
4293     goto restart;
4294   }
4295 err:
4296   sql_print_information("Stopping Cluster Binlog");
4297   DBUG_PRINT("info",("Shutting down cluster binlog thread"));
4298   thd->proc_info= "Shutting down";
4299   thd->stmt_da->can_overwrite_status= TRUE;
4300   thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
4301   thd->stmt_da->can_overwrite_status= FALSE;
4302   close_thread_tables(thd);
4303   thd->mdl_context.release_transactional_locks();
4304   mysql_mutex_lock(&injector_mutex);
4305   /* don't mess with the injector_ndb anymore from other threads */
4306   injector_thd= 0;
4307   injector_ndb= 0;
4308   p_latest_trans_gci= 0;
4309   schema_ndb= 0;
4310   mysql_mutex_unlock(&injector_mutex);
4311   thd->db= 0; // as not to try to free memory
4312 
4313   if (ndb_apply_status_share)
4314   {
4315     /* ndb_share reference binlog extra free */
4316     DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
4317                              ndb_apply_status_share->key,
4318                              ndb_apply_status_share->use_count));
4319     free_share(&ndb_apply_status_share);
4320     ndb_apply_status_share= 0;
4321   }
4322   if (ndb_schema_share)
4323   {
4324     /* begin protect ndb_schema_share */
4325     mysql_mutex_lock(&ndb_schema_share_mutex);
4326     /* ndb_share reference binlog extra free */
4327     DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
4328                              ndb_schema_share->key,
4329                              ndb_schema_share->use_count));
4330     free_share(&ndb_schema_share);
4331     ndb_schema_share= 0;
4332     ndb_binlog_tables_inited= 0;
4333     mysql_mutex_unlock(&ndb_schema_share_mutex);
4334     /* end protect ndb_schema_share */
4335   }
4336 
4337   /* remove all event operations */
4338   if (s_ndb)
4339   {
4340     NdbEventOperation *op;
4341     DBUG_PRINT("info",("removing all event operations"));
4342     while ((op= s_ndb->getEventOperation()))
4343     {
4344       DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
4345       DBUG_PRINT("info",("removing event operation on %s",
4346                          op->getEvent()->getName()));
4347       NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4348       DBUG_ASSERT(share != 0);
4349       DBUG_ASSERT(share->op == op ||
4350                   share->op_old == op);
4351       share->op= share->op_old= 0;
4352       /* ndb_share reference binlog free */
4353       DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
4354                                share->key, share->use_count));
4355       free_share(&share);
4356       s_ndb->dropEventOperation(op);
4357     }
4358     delete s_ndb;
4359     s_ndb= 0;
4360   }
4361   if (i_ndb)
4362   {
4363     NdbEventOperation *op;
4364     DBUG_PRINT("info",("removing all event operations"));
4365     while ((op= i_ndb->getEventOperation()))
4366     {
4367       DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
4368       DBUG_PRINT("info",("removing event operation on %s",
4369                          op->getEvent()->getName()));
4370       NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4371       DBUG_ASSERT(share != 0);
4372       DBUG_ASSERT(share->op == op ||
4373                   share->op_old == op);
4374       share->op= share->op_old= 0;
4375       /* ndb_share reference binlog free */
4376       DBUG_PRINT("NDB_SHARE", ("%s binlog free  use_count: %u",
4377                                share->key, share->use_count));
4378       free_share(&share);
4379       i_ndb->dropEventOperation(op);
4380     }
4381     delete i_ndb;
4382     i_ndb= 0;
4383   }
4384 
4385   my_hash_free(&ndb_schema_objects);
4386 
4387   net_end(&thd->net);
4388   thd->cleanup();
4389   delete thd;
4390 
4391   ndb_binlog_thread_running= -1;
4392   ndb_binlog_running= FALSE;
4393   mysql_cond_signal(&injector_cond);
4394 
4395   DBUG_PRINT("exit", ("ndb_binlog_thread"));
4396 
4397   DBUG_LEAVE;                               // Must match DBUG_ENTER()
4398   my_thread_end();
4399   pthread_exit(0);
4400   return NULL;                              // Avoid compiler warnings
4401 }
4402 
4403 bool
ndbcluster_show_status_binlog(THD * thd,stat_print_fn * stat_print,enum ha_stat_type stat_type)4404 ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
4405                               enum ha_stat_type stat_type)
4406 {
4407   char buf[IO_SIZE];
4408   uint buflen;
4409   ulonglong ndb_latest_epoch= 0;
4410   DBUG_ENTER("ndbcluster_show_status_binlog");
4411 
4412   mysql_mutex_lock(&injector_mutex);
4413   if (injector_ndb)
4414   {
4415     char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
4416     ndb_latest_epoch= injector_ndb->getLatestGCI();
4417     mysql_mutex_unlock(&injector_mutex);
4418 
4419     buflen=
4420       snprintf(buf, sizeof(buf),
4421                "latest_epoch=%s, "
4422                "latest_trans_epoch=%s, "
4423                "latest_received_binlog_epoch=%s, "
4424                "latest_handled_binlog_epoch=%s, "
4425                "latest_applied_binlog_epoch=%s",
4426                llstr(ndb_latest_epoch, buff1),
4427                llstr(*p_latest_trans_gci, buff2),
4428                llstr(ndb_latest_received_binlog_epoch, buff3),
4429                llstr(ndb_latest_handled_binlog_epoch, buff4),
4430                llstr(ndb_latest_applied_binlog_epoch, buff5));
4431     if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
4432                    "binlog", strlen("binlog"),
4433                    buf, buflen))
4434       DBUG_RETURN(TRUE);
4435   }
4436   else
4437     mysql_mutex_unlock(&injector_mutex);
4438   DBUG_RETURN(FALSE);
4439 }
4440 
4441 #endif /* HAVE_NDB_BINLOG */
4442 #endif
4443