1 /*
2 Copyright (c) 2006, 2015, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17
18 #include "sql_priv.h"
19 #include "unireg.h" // REQUIRED: for other includes
20 #include "sql_show.h"
21 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
22 #include "ha_ndbcluster.h"
23
24 #ifdef HAVE_NDB_BINLOG
25 #include "rpl_injector.h"
26 #include "rpl_filter.h"
27 #include "slave.h"
28 #include "log_event.h"
29 #include "ha_ndbcluster_binlog.h"
30 #include "NdbDictionary.hpp"
31 #include "ndb_cluster_connection.hpp"
32 #include <util/NdbAutoPtr.hpp>
33
34 #include "sql_base.h" // close_thread_tables
35 #include "sql_table.h" // build_table_filename
36 #include "table.h" // open_table_from_share
37 #include "discover.h" // readfrm, writefrm
38 #include "lock.h" // MYSQL_LOCK_IGNORE_FLUSH,
39 // mysql_unlock_tables
40 #include "sql_parse.h" // mysql_parse
41 #include "transaction.h"
42
43 #ifdef ndb_dynamite
44 #undef assert
45 #define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
46 #endif
47
48 extern my_bool opt_ndb_log_binlog_index;
49 extern ulong opt_ndb_extra_logging;
50 /*
51 defines for cluster replication table names
52 */
53 #include "ha_ndbcluster_tables.h"
54 #define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
55 #define NDB_SCHEMA_TABLE_FILE "./" NDB_REP_DB "/" NDB_SCHEMA_TABLE
56
57 /*
58 Timeout for syncing schema events between
59 mysql servers, and between mysql server and the binlog
60 */
61 static const int DEFAULT_SYNC_TIMEOUT= 120;
62
63
64 /*
65 Flag showing if the ndb injector thread is running, if so == 1
66 -1 if it was started but later stopped for some reason
67 0 if never started
68 */
69 static int ndb_binlog_thread_running= 0;
70
71 /*
72 Flag showing if the ndb binlog should be created, if so == TRUE
73 FALSE if not
74 */
75 my_bool ndb_binlog_running= FALSE;
76 my_bool ndb_binlog_tables_inited= FALSE;
77
78 /*
79 Global reference to the ndb injector thread THD oject
80
81 Has one sole purpose, for setting the in_use table member variable
82 in get_share(...)
83 */
84 THD *injector_thd= 0;
85
86 /*
87 Global reference to ndb injector thd object.
88
89 Used mainly by the binlog index thread, but exposed to the client sql
90 thread for one reason; to setup the events operations for a table
91 to enable ndb injector thread receiving events.
92
93 Must therefore always be used with a surrounding
94 mysql_mutex_lock(&injector_mutex), when doing create/dropEventOperation
95 */
96 static Ndb *injector_ndb= 0;
97 static Ndb *schema_ndb= 0;
98
99 static int ndbcluster_binlog_inited= 0;
100 /*
101 Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
102 Server main loop should call handlerton function:
103
104 ndbcluster_hton->binlog_func ==
105 ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
106 ndbcluster_binlog_end
107
108 at shutdown, which sets the flag. And then server needs to wait for it
109 to complete. Otherwise binlog will not be complete.
110
111 ndbcluster_hton->panic == ndbcluster_end() will not return until
112 ndb binlog is completed
113 */
114 static int ndbcluster_binlog_terminating= 0;
115
116 /*
117 Mutex and condition used for interacting between client sql thread
118 and injector thread
119 */
120 pthread_t ndb_binlog_thread;
121 mysql_mutex_t injector_mutex;
122 mysql_cond_t injector_cond;
123
124 /* NDB Injector thread (used for binlog creation) */
125 static ulonglong ndb_latest_applied_binlog_epoch= 0;
126 static ulonglong ndb_latest_handled_binlog_epoch= 0;
127 static ulonglong ndb_latest_received_binlog_epoch= 0;
128
129 NDB_SHARE *ndb_apply_status_share= 0;
130 NDB_SHARE *ndb_schema_share= 0;
131 mysql_mutex_t ndb_schema_share_mutex;
132
133 extern my_bool opt_log_slave_updates;
134 static my_bool g_ndb_log_slave_updates;
135
136 /* Schema object distribution handling */
137 HASH ndb_schema_objects;
138 typedef struct st_ndb_schema_object {
139 mysql_mutex_t mutex;
140 char *key;
141 uint key_length;
142 uint use_count;
143 MY_BITMAP slock_bitmap;
144 uint32 slock[256/32]; // 256 bits for lock status of table
145 } NDB_SCHEMA_OBJECT;
146 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
147 my_bool create_if_not_exists,
148 my_bool have_lock);
149 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
150 bool have_lock);
151
152 static Uint64 *p_latest_trans_gci= 0;
153
154 /*
155 Global variables for holding the ndb_binlog_index table reference
156 */
157 static TABLE *ndb_binlog_index= 0;
158 static TABLE_LIST binlog_tables;
159
160 /*
161 Helper functions
162 */
163
164 #ifndef DBUG_OFF
165 /* purecov: begin deadcode */
print_records(TABLE * table,const uchar * record)166 static void print_records(TABLE *table, const uchar *record)
167 {
168 for (uint j= 0; j < table->s->fields; j++)
169 {
170 char buf[40];
171 int pos= 0;
172 Field *field= table->field[j];
173 const uchar* field_ptr= field->ptr - table->record[0] + record;
174 int pack_len= field->pack_length();
175 int n= pack_len < 10 ? pack_len : 10;
176
177 for (int i= 0; i < n && pos < 20; i++)
178 {
179 pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
180 }
181 buf[pos]= 0;
182 DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
183 }
184 }
185 /* purecov: end */
186 #else
187 #define print_records(a,b)
188 #endif
189
190
191 #ifndef DBUG_OFF
dbug_print_table(const char * info,TABLE * table)192 static void dbug_print_table(const char *info, TABLE *table)
193 {
194 if (table == 0)
195 {
196 DBUG_PRINT("info",("%s: (null)", info));
197 return;
198 }
199 DBUG_PRINT("info",
200 ("%s: %s.%s s->fields: %d "
201 "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
202 "record[1]: 0x%lx",
203 info,
204 table->s->db.str,
205 table->s->table_name.str,
206 table->s->fields,
207 table->s->reclength,
208 table->s->rec_buff_length,
209 (long) table->record[0],
210 (long) table->record[1]));
211
212 for (unsigned int i= 0; i < table->s->fields; i++)
213 {
214 Field *f= table->field[i];
215 DBUG_PRINT("info",
216 ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
217 "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
218 i,
219 f->field_name,
220 (long) f->flags,
221 (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
222 (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
223 (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
224 (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
225 (f->flags & BLOB_FLAG) ? ",blob" : "",
226 (f->flags & BINARY_FLAG) ? ",binary" : "",
227 f->real_type(),
228 f->pack_length(),
229 (long) f->ptr, (int) (f->ptr - table->record[0]),
230 f->null_bit,
231 (long) f->null_ptr,
232 (int) ((uchar*) f->null_ptr - table->record[0])));
233 if (f->type() == MYSQL_TYPE_BIT)
234 {
235 Field_bit *g= (Field_bit*) f;
236 DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
237 "bit_ofs: %d bit_len: %u",
238 g->field_length, (long) g->bit_ptr,
239 (int) ((uchar*) g->bit_ptr -
240 table->record[0]),
241 g->bit_ofs, g->bit_len));
242 }
243 }
244 }
245 #else
246 #define dbug_print_table(a,b)
247 #endif
248
249
250 /*
251 Run a query through mysql_parse
252
253 Used to:
254 - purging the ndb_binlog_index
255 - creating the ndb_apply_status table
256 */
run_query(THD * thd,char * buf,char * end,const int * no_print_error,my_bool disable_binlog)257 static void run_query(THD *thd, char *buf, char *end,
258 const int *no_print_error, my_bool disable_binlog)
259 {
260 ulong save_thd_query_length= thd->query_length();
261 char *save_thd_query= thd->query();
262 ulong save_thread_id= thd->variables.pseudo_thread_id;
263 struct system_status_var save_thd_status_var= thd->status_var;
264 THD_TRANS save_thd_transaction_all= thd->transaction.all;
265 THD_TRANS save_thd_transaction_stmt= thd->transaction.stmt;
266 ulonglong save_thd_options= thd->variables.option_bits;
267 DBUG_ASSERT(sizeof(save_thd_options) == sizeof(thd->variables.option_bits));
268 NET save_thd_net= thd->net;
269
270 bzero((char*) &thd->net, sizeof(NET));
271 thd->set_query(buf, (uint) (end - buf));
272 thd->variables.pseudo_thread_id= thread_id;
273 thd->transaction.stmt.modified_non_trans_table= FALSE;
274 if (disable_binlog)
275 thd->variables.option_bits&= ~OPTION_BIN_LOG;
276
277 DBUG_PRINT("query", ("%s", thd->query()));
278
279 DBUG_ASSERT(!thd->in_sub_stmt);
280 DBUG_ASSERT(!thd->locked_tables_mode);
281
282 {
283 Parser_state parser_state;
284 if (!parser_state.init(thd, thd->query(), thd->query_length()))
285 mysql_parse(thd, thd->query(), thd->query_length(), &parser_state);
286 }
287
288 if (no_print_error && thd->is_slave_error)
289 {
290 int i;
291 Thd_ndb *thd_ndb= get_thd_ndb(thd);
292 for (i= 0; no_print_error[i]; i++)
293 if ((thd_ndb->m_error_code == no_print_error[i]) ||
294 (thd->stmt_da->sql_errno() == (unsigned) no_print_error[i]))
295 break;
296 if (!no_print_error[i])
297 sql_print_error("NDB: %s: error %s %d(ndb: %d) %d %d",
298 buf,
299 thd->stmt_da->message(),
300 thd->stmt_da->sql_errno(),
301 thd_ndb->m_error_code,
302 (int) thd->is_error(), thd->is_slave_error);
303 }
304 /*
305 XXX: this code is broken. mysql_parse()/mysql_reset_thd_for_next_command()
306 can not be called from within a statement, and
307 run_query() can be called from anywhere, including from within
308 a sub-statement.
309 This particular reset is a temporary hack to avoid an assert
310 for double assignment of the diagnostics area when run_query()
311 is called from ndbcluster_reset_logs(), which is called from
312 mysql_flush().
313 */
314 thd->stmt_da->reset_diagnostics_area();
315
316 thd->variables.option_bits= save_thd_options;
317 thd->set_query(save_thd_query, save_thd_query_length);
318 thd->variables.pseudo_thread_id= save_thread_id;
319 thd->status_var= save_thd_status_var;
320 thd->transaction.all= save_thd_transaction_all;
321 thd->transaction.stmt= save_thd_transaction_stmt;
322 thd->net= save_thd_net;
323 thd->set_current_stmt_binlog_format_row();
324
325 if (thd == injector_thd)
326 {
327 /*
328 running the query will close all tables, including the ndb_binlog_index
329 used in injector_thd
330 */
331 ndb_binlog_index= 0;
332 }
333 }
334
335 static void
ndbcluster_binlog_close_table(THD * thd,NDB_SHARE * share)336 ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
337 {
338 DBUG_ENTER("ndbcluster_binlog_close_table");
339 if (share->table_share)
340 {
341 closefrm(share->table, 1);
342 share->table_share= 0;
343 share->table= 0;
344 }
345 DBUG_ASSERT(share->table == 0);
346 DBUG_VOID_RETURN;
347 }
348
349
350 /*
351 Creates a TABLE object for the ndb cluster table
352
353 NOTES
354 This does not open the underlying table
355 */
356
357 static int
ndbcluster_binlog_open_table(THD * thd,NDB_SHARE * share,TABLE_SHARE * table_share,TABLE * table,int reopen)358 ndbcluster_binlog_open_table(THD *thd, NDB_SHARE *share,
359 TABLE_SHARE *table_share, TABLE *table,
360 int reopen)
361 {
362 int error;
363 DBUG_ENTER("ndbcluster_binlog_open_table");
364
365 init_tmp_table_share(thd, table_share, share->db, 0, share->table_name,
366 share->key);
367 if ((error= open_table_def(thd, table_share, 0)))
368 {
369 DBUG_PRINT("error", ("open_table_def failed: %d my_errno: %d", error, my_errno));
370 free_table_share(table_share);
371 DBUG_RETURN(error);
372 }
373 if ((error= open_table_from_share(thd, table_share, "", 0 /* fon't allocate buffers */,
374 (uint) READ_ALL, 0, table, FALSE)))
375 {
376 DBUG_PRINT("error", ("open_table_from_share failed %d my_errno: %d", error, my_errno));
377 free_table_share(table_share);
378 DBUG_RETURN(error);
379 }
380 mysql_mutex_lock(&LOCK_open);
381 assign_new_table_id(table_share);
382 mysql_mutex_unlock(&LOCK_open);
383
384 if (!reopen)
385 {
386 // allocate memory on ndb share so it can be reused after online alter table
387 (void)multi_alloc_root(&share->mem_root,
388 &(share->record[0]), table->s->rec_buff_length,
389 &(share->record[1]), table->s->rec_buff_length,
390 NULL);
391 }
392 {
393 my_ptrdiff_t row_offset= share->record[0] - table->record[0];
394 Field **p_field;
395 for (p_field= table->field; *p_field; p_field++)
396 (*p_field)->move_field_offset(row_offset);
397 table->record[0]= share->record[0];
398 table->record[1]= share->record[1];
399 }
400
401 table->in_use= injector_thd;
402
403 table->s->db.str= share->db;
404 table->s->db.length= strlen(share->db);
405 table->s->table_name.str= share->table_name;
406 table->s->table_name.length= strlen(share->table_name);
407
408 DBUG_ASSERT(share->table_share == 0);
409 share->table_share= table_share;
410 DBUG_ASSERT(share->table == 0);
411 share->table= table;
412 /* We can't use 'use_all_columns()' as the file object is not setup yet */
413 table->column_bitmaps_set_no_signal(&table->s->all_set, &table->s->all_set);
414 #ifndef DBUG_OFF
415 dbug_print_table("table", table);
416 #endif
417 DBUG_RETURN(0);
418 }
419
420
421 /*
422 Initialize the binlog part of the NDB_SHARE
423 */
ndbcluster_binlog_init_share(NDB_SHARE * share,TABLE * _table)424 int ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
425 {
426 THD *thd= current_thd;
427 MEM_ROOT *mem_root= &share->mem_root;
428 int do_event_op= ndb_binlog_running;
429 int error= 0;
430 DBUG_ENTER("ndbcluster_binlog_init_share");
431
432 share->connect_count= g_ndb_cluster_connection->get_connect_count();
433
434 share->op= 0;
435 share->table= 0;
436
437 if (!ndb_schema_share &&
438 strcmp(share->db, NDB_REP_DB) == 0 &&
439 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
440 do_event_op= 1;
441 else if (!ndb_apply_status_share &&
442 strcmp(share->db, NDB_REP_DB) == 0 &&
443 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
444 do_event_op= 1;
445
446 {
447 int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
448 share->subscriber_bitmap= (MY_BITMAP*)
449 alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
450 for (i= 0; i < no_nodes; i++)
451 {
452 bitmap_init(&share->subscriber_bitmap[i],
453 (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
454 max_ndb_nodes, FALSE);
455 bitmap_clear_all(&share->subscriber_bitmap[i]);
456 }
457 }
458
459 if (!do_event_op)
460 {
461 if (_table)
462 {
463 if (_table->s->primary_key == MAX_KEY)
464 share->flags|= NSF_HIDDEN_PK;
465 if (_table->s->blob_fields != 0)
466 share->flags|= NSF_BLOB_FLAG;
467 }
468 else
469 {
470 share->flags|= NSF_NO_BINLOG;
471 }
472 DBUG_RETURN(error);
473 }
474 while (1)
475 {
476 int error;
477 TABLE_SHARE *table_share= (TABLE_SHARE *) alloc_root(mem_root, sizeof(*table_share));
478 TABLE *table= (TABLE*) alloc_root(mem_root, sizeof(*table));
479 if ((error= ndbcluster_binlog_open_table(thd, share, table_share, table, 0)))
480 break;
481 /*
482 ! do not touch the contents of the table
483 it may be in use by the injector thread
484 */
485 MEM_ROOT *mem_root= &share->mem_root;
486 share->ndb_value[0]= (NdbValue*)
487 alloc_root(mem_root, sizeof(NdbValue) *
488 (table->s->fields + 2 /*extra for hidden key and part key*/));
489 share->ndb_value[1]= (NdbValue*)
490 alloc_root(mem_root, sizeof(NdbValue) *
491 (table->s->fields + 2 /*extra for hidden key and part key*/));
492
493 if (table->s->primary_key == MAX_KEY)
494 share->flags|= NSF_HIDDEN_PK;
495 if (table->s->blob_fields != 0)
496 share->flags|= NSF_BLOB_FLAG;
497 break;
498 }
499 DBUG_RETURN(error);
500 }
501
502 /*****************************************************************
503 functions called from master sql client threads
504 ****************************************************************/
505
506 /*
507 called in mysql_show_binlog_events and reset_logs to make sure we wait for
508 all events originating from this mysql server to arrive in the binlog
509
510 Wait for the last epoch in which the last transaction is a part of.
511
512 Wait a maximum of 30 seconds.
513 */
ndbcluster_binlog_wait(THD * thd)514 static void ndbcluster_binlog_wait(THD *thd)
515 {
516 if (ndb_binlog_running)
517 {
518 DBUG_ENTER("ndbcluster_binlog_wait");
519 const char *save_info= thd ? thd->proc_info : 0;
520 ulonglong wait_epoch= *p_latest_trans_gci;
521 int count= 30;
522 if (thd)
523 thd->proc_info= "Waiting for ndbcluster binlog update to "
524 "reach current position";
525 while (count && ndb_binlog_running &&
526 ndb_latest_handled_binlog_epoch < wait_epoch)
527 {
528 count--;
529 sleep(1);
530 }
531 if (thd)
532 thd->proc_info= save_info;
533 DBUG_VOID_RETURN;
534 }
535 }
536
537 /*
538 Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
539 */
ndbcluster_reset_logs(THD * thd)540 static int ndbcluster_reset_logs(THD *thd)
541 {
542 if (!ndb_binlog_running)
543 return 0;
544
545 DBUG_ENTER("ndbcluster_reset_logs");
546
547 /*
548 Wait for all events orifinating from this mysql server has
549 reached the binlog before continuing to reset
550 */
551 ndbcluster_binlog_wait(thd);
552
553 char buf[1024];
554 char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
555
556 run_query(thd, buf, end, NULL, TRUE);
557
558 DBUG_RETURN(0);
559 }
560
561 /*
562 Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
563 is removed
564 */
565
566 static int
ndbcluster_binlog_index_purge_file(THD * thd,const char * file)567 ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
568 {
569 if (!ndb_binlog_running || thd->slave_thread)
570 return 0;
571
572 DBUG_ENTER("ndbcluster_binlog_index_purge_file");
573 DBUG_PRINT("enter", ("file: %s", file));
574
575 char buf[1024];
576 char *end= strmov(strmov(strmov(buf,
577 "DELETE FROM "
578 NDB_REP_DB "." NDB_REP_TABLE
579 " WHERE File='"), file), "'");
580
581 run_query(thd, buf, end, NULL, TRUE);
582
583 DBUG_RETURN(0);
584 }
585
586 static void
ndbcluster_binlog_log_query(handlerton * hton,THD * thd,enum_binlog_command binlog_command,const char * query,uint query_length,const char * db,const char * table_name)587 ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binlog_command,
588 const char *query, uint query_length,
589 const char *db, const char *table_name)
590 {
591 DBUG_ENTER("ndbcluster_binlog_log_query");
592 DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
593 db, table_name, query));
594 enum SCHEMA_OP_TYPE type;
595 int log= 0;
596 switch (binlog_command)
597 {
598 case LOGCOM_CREATE_TABLE:
599 type= SOT_CREATE_TABLE;
600 DBUG_ASSERT(FALSE);
601 break;
602 case LOGCOM_ALTER_TABLE:
603 type= SOT_ALTER_TABLE;
604 log= 1;
605 break;
606 case LOGCOM_RENAME_TABLE:
607 type= SOT_RENAME_TABLE;
608 DBUG_ASSERT(FALSE);
609 break;
610 case LOGCOM_DROP_TABLE:
611 type= SOT_DROP_TABLE;
612 DBUG_ASSERT(FALSE);
613 break;
614 case LOGCOM_CREATE_DB:
615 type= SOT_CREATE_DB;
616 log= 1;
617 break;
618 case LOGCOM_ALTER_DB:
619 type= SOT_ALTER_DB;
620 log= 1;
621 break;
622 case LOGCOM_DROP_DB:
623 type= SOT_DROP_DB;
624 DBUG_ASSERT(FALSE);
625 break;
626 }
627 if (log)
628 {
629 ndbcluster_log_schema_op(thd, 0, query, query_length,
630 db, table_name, 0, 0, type,
631 0, 0);
632 }
633 DBUG_VOID_RETURN;
634 }
635
636
637 /*
638 End use of the NDB Cluster binlog
639 - wait for binlog thread to shutdown
640 */
641
ndbcluster_binlog_end(THD * thd)642 static int ndbcluster_binlog_end(THD *thd)
643 {
644 DBUG_ENTER("ndbcluster_binlog_end");
645
646 if (!ndbcluster_binlog_inited)
647 DBUG_RETURN(0);
648 ndbcluster_binlog_inited= 0;
649
650 #ifdef HAVE_NDB_BINLOG
651 if (ndb_util_thread_running > 0)
652 {
653 /*
654 Wait for util thread to die (as this uses the injector mutex)
655 There is a very small change that ndb_util_thread dies and the
656 following mutex is freed before it's accessed. This shouldn't
657 however be a likely case as the ndbcluster_binlog_end is supposed to
658 be called before ndb_cluster_end().
659 */
660 mysql_mutex_lock(&LOCK_ndb_util_thread);
661 /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
662 ndb_util_thread_running++;
663 ndbcluster_terminating= 1;
664 mysql_cond_signal(&COND_ndb_util_thread);
665 while (ndb_util_thread_running > 1)
666 mysql_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
667 ndb_util_thread_running--;
668 mysql_mutex_unlock(&LOCK_ndb_util_thread);
669 }
670
671 /* wait for injector thread to finish */
672 ndbcluster_binlog_terminating= 1;
673 mysql_mutex_lock(&injector_mutex);
674 mysql_cond_signal(&injector_cond);
675 while (ndb_binlog_thread_running > 0)
676 mysql_cond_wait(&injector_cond, &injector_mutex);
677 mysql_mutex_unlock(&injector_mutex);
678
679 mysql_mutex_destroy(&injector_mutex);
680 mysql_cond_destroy(&injector_cond);
681 mysql_mutex_destroy(&ndb_schema_share_mutex);
682 #endif
683
684 DBUG_RETURN(0);
685 }
686
687 /*****************************************************************
688 functions called from slave sql client threads
689 ****************************************************************/
ndbcluster_reset_slave(THD * thd)690 static void ndbcluster_reset_slave(THD *thd)
691 {
692 if (!ndb_binlog_running)
693 return;
694
695 DBUG_ENTER("ndbcluster_reset_slave");
696 char buf[1024];
697 char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
698 run_query(thd, buf, end, NULL, TRUE);
699 DBUG_VOID_RETURN;
700 }
701
702 /*
703 Initialize the binlog part of the ndb handlerton
704 */
705
706 /**
707 Upon the sql command flush logs, we need to ensure that all outstanding
708 ndb data to be logged has made it to the binary log to get a deterministic
709 behavior on the rotation of the log.
710 */
ndbcluster_flush_logs(handlerton * hton)711 static bool ndbcluster_flush_logs(handlerton *hton)
712 {
713 ndbcluster_binlog_wait(current_thd);
714 return FALSE;
715 }
716
ndbcluster_binlog_func(handlerton * hton,THD * thd,enum_binlog_func fn,void * arg)717 static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
718 enum_binlog_func fn,
719 void *arg)
720 {
721 switch(fn)
722 {
723 case BFN_RESET_LOGS:
724 ndbcluster_reset_logs(thd);
725 break;
726 case BFN_RESET_SLAVE:
727 ndbcluster_reset_slave(thd);
728 break;
729 case BFN_BINLOG_WAIT:
730 ndbcluster_binlog_wait(thd);
731 break;
732 case BFN_BINLOG_END:
733 ndbcluster_binlog_end(thd);
734 break;
735 case BFN_BINLOG_PURGE_FILE:
736 ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
737 break;
738 }
739 return 0;
740 }
741
ndbcluster_binlog_init_handlerton()742 void ndbcluster_binlog_init_handlerton()
743 {
744 handlerton *h= ndbcluster_hton;
745 h->flush_logs= ndbcluster_flush_logs;
746 h->binlog_func= ndbcluster_binlog_func;
747 h->binlog_log_query= ndbcluster_binlog_log_query;
748 }
749
750
751
752
753
754 /*
755 check the availability af the ndb_apply_status share
756 - return share, but do not increase refcount
757 - return 0 if there is no share
758 */
ndbcluster_check_ndb_apply_status_share()759 static NDB_SHARE *ndbcluster_check_ndb_apply_status_share()
760 {
761 mysql_mutex_lock(&ndbcluster_mutex);
762
763 void *share= my_hash_search(&ndbcluster_open_tables,
764 (uchar*) NDB_APPLY_TABLE_FILE,
765 sizeof(NDB_APPLY_TABLE_FILE) - 1);
766 DBUG_PRINT("info",("ndbcluster_check_ndb_apply_status_share %s 0x%lx",
767 NDB_APPLY_TABLE_FILE, (long) share));
768 mysql_mutex_unlock(&ndbcluster_mutex);
769 return (NDB_SHARE*) share;
770 }
771
772 /*
773 check the availability af the schema share
774 - return share, but do not increase refcount
775 - return 0 if there is no share
776 */
ndbcluster_check_ndb_schema_share()777 static NDB_SHARE *ndbcluster_check_ndb_schema_share()
778 {
779 mysql_mutex_lock(&ndbcluster_mutex);
780
781 void *share= my_hash_search(&ndbcluster_open_tables,
782 (uchar*) NDB_SCHEMA_TABLE_FILE,
783 sizeof(NDB_SCHEMA_TABLE_FILE) - 1);
784 DBUG_PRINT("info",("ndbcluster_check_ndb_schema_share %s 0x%lx",
785 NDB_SCHEMA_TABLE_FILE, (long) share));
786 mysql_mutex_unlock(&ndbcluster_mutex);
787 return (NDB_SHARE*) share;
788 }
789
790 /*
791 Create the ndb_apply_status table
792 */
ndbcluster_create_ndb_apply_status_table(THD * thd)793 static int ndbcluster_create_ndb_apply_status_table(THD *thd)
794 {
795 DBUG_ENTER("ndbcluster_create_ndb_apply_status_table");
796
797 /*
798 Check if we already have the apply status table.
799 If so it should have been discovered at startup
800 and thus have a share
801 */
802
803 if (ndbcluster_check_ndb_apply_status_share())
804 DBUG_RETURN(0);
805
806 if (g_ndb_cluster_connection->get_no_ready() <= 0)
807 DBUG_RETURN(0);
808
809 char buf[1024 + 1], *end;
810
811 if (opt_ndb_extra_logging)
812 sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
813
814 /*
815 Check if apply status table exists in MySQL "dictionary"
816 if so, remove it since there is none in Ndb
817 */
818 {
819 build_table_filename(buf, sizeof(buf) - 1,
820 NDB_REP_DB, NDB_APPLY_TABLE, reg_ext, 0);
821 mysql_file_delete(key_file_frm, buf, MYF(0));
822 }
823
824 /*
825 Note, updating this table schema must be reflected in ndb_restore
826 */
827 end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
828 NDB_REP_DB "." NDB_APPLY_TABLE
829 " ( server_id INT UNSIGNED NOT NULL,"
830 " epoch BIGINT UNSIGNED NOT NULL, "
831 " log_name VARCHAR(255) BINARY NOT NULL, "
832 " start_pos BIGINT UNSIGNED NOT NULL, "
833 " end_pos BIGINT UNSIGNED NOT NULL, "
834 " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB CHARACTER SET latin1");
835
836 const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
837 701,
838 702,
839 721, // Table already exist
840 4009,
841 0}; // do not print error 701 etc
842 run_query(thd, buf, end, no_print_error, TRUE);
843
844 DBUG_RETURN(0);
845 }
846
847
848 /*
849 Create the schema table
850 */
ndbcluster_create_schema_table(THD * thd)851 static int ndbcluster_create_schema_table(THD *thd)
852 {
853 DBUG_ENTER("ndbcluster_create_schema_table");
854
855 /*
856 Check if we already have the schema table.
857 If so it should have been discovered at startup
858 and thus have a share
859 */
860
861 if (ndbcluster_check_ndb_schema_share())
862 DBUG_RETURN(0);
863
864 if (g_ndb_cluster_connection->get_no_ready() <= 0)
865 DBUG_RETURN(0);
866
867 char buf[1024 + 1], *end;
868
869 if (opt_ndb_extra_logging)
870 sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_SCHEMA_TABLE);
871
872 /*
873 Check if schema table exists in MySQL "dictionary"
874 if so, remove it since there is none in Ndb
875 */
876 {
877 build_table_filename(buf, sizeof(buf) - 1,
878 NDB_REP_DB, NDB_SCHEMA_TABLE, reg_ext, 0);
879 mysql_file_delete(key_file_frm, buf, MYF(0));
880 }
881
882 /*
883 Update the defines below to reflect the table schema
884 */
885 end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
886 NDB_REP_DB "." NDB_SCHEMA_TABLE
887 " ( db VARBINARY(63) NOT NULL,"
888 " name VARBINARY(63) NOT NULL,"
889 " slock BINARY(32) NOT NULL,"
890 " query BLOB NOT NULL,"
891 " node_id INT UNSIGNED NOT NULL,"
892 " epoch BIGINT UNSIGNED NOT NULL,"
893 " id INT UNSIGNED NOT NULL,"
894 " version INT UNSIGNED NOT NULL,"
895 " type INT UNSIGNED NOT NULL,"
896 " PRIMARY KEY USING HASH (db,name) ) ENGINE=NDB CHARACTER SET latin1");
897
898 const int no_print_error[6]= {ER_TABLE_EXISTS_ERROR,
899 701,
900 702,
901 721, // Table already exist
902 4009,
903 0}; // do not print error 701 etc
904 run_query(thd, buf, end, no_print_error, TRUE);
905
906 DBUG_RETURN(0);
907 }
908
ndbcluster_setup_binlog_table_shares(THD * thd)909 int ndbcluster_setup_binlog_table_shares(THD *thd)
910 {
911 if (!ndb_schema_share &&
912 ndbcluster_check_ndb_schema_share() == 0)
913 {
914 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
915 if (!ndb_schema_share)
916 {
917 ndbcluster_create_schema_table(thd);
918 // always make sure we create the 'schema' first
919 if (!ndb_schema_share)
920 return 1;
921 }
922 }
923 if (!ndb_apply_status_share &&
924 ndbcluster_check_ndb_apply_status_share() == 0)
925 {
926 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
927 if (!ndb_apply_status_share)
928 {
929 ndbcluster_create_ndb_apply_status_table(thd);
930 if (!ndb_apply_status_share)
931 return 1;
932 }
933 }
934 if (!ndbcluster_find_all_files(thd))
935 {
936 ndb_binlog_tables_inited= TRUE;
937 if (opt_ndb_extra_logging)
938 sql_print_information("NDB Binlog: ndb tables writable");
939 close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
940 /* Signal injector thread that all is setup */
941 mysql_cond_signal(&injector_cond);
942 }
943 return 0;
944 }
945
946 /*
947 Defines and struct for schema table.
948 Should reflect table definition above.
949 */
950 #define SCHEMA_DB_I 0u
951 #define SCHEMA_NAME_I 1u
952 #define SCHEMA_SLOCK_I 2u
953 #define SCHEMA_QUERY_I 3u
954 #define SCHEMA_NODE_ID_I 4u
955 #define SCHEMA_EPOCH_I 5u
956 #define SCHEMA_ID_I 6u
957 #define SCHEMA_VERSION_I 7u
958 #define SCHEMA_TYPE_I 8u
959 #define SCHEMA_SIZE 9u
960 #define SCHEMA_SLOCK_SIZE 32u
961
962 struct Cluster_schema
963 {
964 uchar db_length;
965 char db[64];
966 uchar name_length;
967 char name[64];
968 uchar slock_length;
969 uint32 slock[SCHEMA_SLOCK_SIZE/4];
970 unsigned short query_length;
971 char *query;
972 Uint64 epoch;
973 uint32 node_id;
974 uint32 id;
975 uint32 version;
976 uint32 type;
977 uint32 any_value;
978 };
979
print_could_not_discover_error(THD * thd,const Cluster_schema * schema)980 static void print_could_not_discover_error(THD *thd,
981 const Cluster_schema *schema)
982 {
983 sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
984 "binlog schema event '%s' from node %d. "
985 "my_errno: %d",
986 schema->db, schema->name, schema->query,
987 schema->node_id, my_errno);
988 List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
989 MYSQL_ERROR *err;
990 while ((err= it++))
991 sql_print_warning("NDB Binlog: (%d)%s", err->get_sql_errno(),
992 err->get_message_text());
993 }
994
995 /*
996 Transfer schema table data into corresponding struct
997 */
ndbcluster_get_schema(NDB_SHARE * share,Cluster_schema * s)998 static void ndbcluster_get_schema(NDB_SHARE *share,
999 Cluster_schema *s)
1000 {
1001 TABLE *table= share->table;
1002 Field **field;
1003 /* unpack blob values */
1004 uchar* blobs_buffer= 0;
1005 uint blobs_buffer_size= 0;
1006 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1007 {
1008 ptrdiff_t ptrdiff= 0;
1009 int ret= get_ndb_blobs_value(table, share->ndb_value[0],
1010 blobs_buffer, blobs_buffer_size,
1011 ptrdiff);
1012 if (ret != 0)
1013 {
1014 my_free(blobs_buffer);
1015 DBUG_PRINT("info", ("blob read error"));
1016 DBUG_ASSERT(FALSE);
1017 }
1018 }
1019 /* db varchar 1 length uchar */
1020 field= table->field;
1021 s->db_length= *(uint8*)(*field)->ptr;
1022 DBUG_ASSERT(s->db_length <= (*field)->field_length);
1023 DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
1024 memcpy(s->db, (*field)->ptr + 1, s->db_length);
1025 s->db[s->db_length]= 0;
1026 /* name varchar 1 length uchar */
1027 field++;
1028 s->name_length= *(uint8*)(*field)->ptr;
1029 DBUG_ASSERT(s->name_length <= (*field)->field_length);
1030 DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
1031 memcpy(s->name, (*field)->ptr + 1, s->name_length);
1032 s->name[s->name_length]= 0;
1033 /* slock fixed length */
1034 field++;
1035 s->slock_length= (*field)->field_length;
1036 DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
1037 memcpy(s->slock, (*field)->ptr, s->slock_length);
1038 /* query blob */
1039 field++;
1040 {
1041 Field_blob *field_blob= (Field_blob*)(*field);
1042 uint blob_len= field_blob->get_length((*field)->ptr);
1043 uchar *blob_ptr= 0;
1044 field_blob->get_ptr(&blob_ptr);
1045 DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
1046 s->query_length= blob_len;
1047 s->query= sql_strmake((char*) blob_ptr, blob_len);
1048 }
1049 /* node_id */
1050 field++;
1051 s->node_id= ((Field_long *)*field)->val_int();
1052 /* epoch */
1053 field++;
1054 s->epoch= ((Field_long *)*field)->val_int();
1055 /* id */
1056 field++;
1057 s->id= ((Field_long *)*field)->val_int();
1058 /* version */
1059 field++;
1060 s->version= ((Field_long *)*field)->val_int();
1061 /* type */
1062 field++;
1063 s->type= ((Field_long *)*field)->val_int();
1064 /* free blobs buffer */
1065 my_free(blobs_buffer);
1066 dbug_tmp_restore_column_map(table->read_set, old_map);
1067 }
1068
1069 /*
1070 helper function to pack a ndb varchar
1071 */
ndb_pack_varchar(const NDBCOL * col,char * buf,const char * str,int sz)1072 char *ndb_pack_varchar(const NDBCOL *col, char *buf,
1073 const char *str, int sz)
1074 {
1075 switch (col->getArrayType())
1076 {
1077 case NDBCOL::ArrayTypeFixed:
1078 memcpy(buf, str, sz);
1079 break;
1080 case NDBCOL::ArrayTypeShortVar:
1081 *(uchar*)buf= (uchar)sz;
1082 memcpy(buf + 1, str, sz);
1083 break;
1084 case NDBCOL::ArrayTypeMediumVar:
1085 int2store(buf, sz);
1086 memcpy(buf + 2, str, sz);
1087 break;
1088 }
1089 return buf;
1090 }
1091
1092 /*
1093 acknowledge handling of schema operation
1094 */
1095 static int
ndbcluster_update_slock(THD * thd,const char * db,const char * table_name)1096 ndbcluster_update_slock(THD *thd,
1097 const char *db,
1098 const char *table_name)
1099 {
1100 DBUG_ENTER("ndbcluster_update_slock");
1101 if (!ndb_schema_share)
1102 {
1103 DBUG_RETURN(0);
1104 }
1105
1106 const NdbError *ndb_error= 0;
1107 uint32 node_id= g_ndb_cluster_connection->node_id();
1108 Ndb *ndb= check_ndb_in_thd(thd);
1109 char save_db[FN_HEADLEN];
1110 strcpy(save_db, ndb->getDatabaseName());
1111
1112 char tmp_buf[FN_REFLEN];
1113 NDBDICT *dict= ndb->getDictionary();
1114 ndb->setDatabaseName(NDB_REP_DB);
1115 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1116 const NDBTAB *ndbtab= ndbtab_g.get_table();
1117 NdbTransaction *trans= 0;
1118 int retries= 100;
1119 int retry_sleep= 10; /* 10 milliseconds, transaction */
1120 const NDBCOL *col[SCHEMA_SIZE];
1121 unsigned sz[SCHEMA_SIZE];
1122
1123 MY_BITMAP slock;
1124 uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
1125 bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
1126
1127 if (ndbtab == 0)
1128 {
1129 abort();
1130 DBUG_RETURN(0);
1131 }
1132
1133 {
1134 uint i;
1135 for (i= 0; i < SCHEMA_SIZE; i++)
1136 {
1137 col[i]= ndbtab->getColumn(i);
1138 if (i != SCHEMA_QUERY_I)
1139 {
1140 sz[i]= col[i]->getLength();
1141 DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1142 }
1143 }
1144 }
1145
1146 while (1)
1147 {
1148 if ((trans= ndb->startTransaction()) == 0)
1149 goto err;
1150 {
1151 NdbOperation *op= 0;
1152 int r= 0;
1153
1154 /* read the bitmap exlusive */
1155 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1156 DBUG_ASSERT(r == 0);
1157 r|= op->readTupleExclusive();
1158 DBUG_ASSERT(r == 0);
1159
1160 /* db */
1161 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1162 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1163 DBUG_ASSERT(r == 0);
1164 /* name */
1165 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1166 strlen(table_name));
1167 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1168 DBUG_ASSERT(r == 0);
1169 /* slock */
1170 r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
1171 DBUG_ASSERT(r == 0);
1172 }
1173 if (trans->execute(NdbTransaction::NoCommit))
1174 goto err;
1175 bitmap_clear_bit(&slock, node_id);
1176 {
1177 NdbOperation *op= 0;
1178 int r= 0;
1179
1180 /* now update the tuple */
1181 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1182 DBUG_ASSERT(r == 0);
1183 r|= op->updateTuple();
1184 DBUG_ASSERT(r == 0);
1185
1186 /* db */
1187 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1188 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1189 DBUG_ASSERT(r == 0);
1190 /* name */
1191 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1192 strlen(table_name));
1193 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1194 DBUG_ASSERT(r == 0);
1195 /* slock */
1196 r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
1197 DBUG_ASSERT(r == 0);
1198 /* node_id */
1199 r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1200 DBUG_ASSERT(r == 0);
1201 /* type */
1202 r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
1203 DBUG_ASSERT(r == 0);
1204 }
1205 if (trans->execute(NdbTransaction::Commit) == 0)
1206 {
1207 dict->forceGCPWait();
1208 DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
1209 node_id, db, table_name));
1210 break;
1211 }
1212 err:
1213 const NdbError *this_error= trans ?
1214 &trans->getNdbError() : &ndb->getNdbError();
1215 if (this_error->status == NdbError::TemporaryError)
1216 {
1217 if (retries--)
1218 {
1219 if (trans)
1220 ndb->closeTransaction(trans);
1221 my_sleep(retry_sleep);
1222 continue; // retry
1223 }
1224 }
1225 ndb_error= this_error;
1226 break;
1227 }
1228
1229 if (ndb_error)
1230 {
1231 char buf[1024];
1232 my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
1233 db, table_name);
1234 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1235 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1236 ndb_error->code, ndb_error->message, buf);
1237 }
1238 if (trans)
1239 ndb->closeTransaction(trans);
1240 ndb->setDatabaseName(save_db);
1241 DBUG_RETURN(0);
1242 }
1243
1244 /*
1245 log query in schema table
1246 */
ndb_report_waiting(const char * key,int the_time,const char * op,const char * obj)1247 static void ndb_report_waiting(const char *key,
1248 int the_time,
1249 const char *op,
1250 const char *obj)
1251 {
1252 ulonglong ndb_latest_epoch= 0;
1253 const char *proc_info= "<no info>";
1254 mysql_mutex_lock(&injector_mutex);
1255 if (injector_ndb)
1256 ndb_latest_epoch= injector_ndb->getLatestGCI();
1257 if (injector_thd)
1258 proc_info= injector_thd->proc_info;
1259 mysql_mutex_unlock(&injector_mutex);
1260 sql_print_information("NDB %s:"
1261 " waiting max %u sec for %s %s."
1262 " epochs: (%u,%u,%u)"
1263 " injector proc_info: %s"
1264 ,key, the_time, op, obj
1265 ,(uint)ndb_latest_handled_binlog_epoch
1266 ,(uint)ndb_latest_received_binlog_epoch
1267 ,(uint)ndb_latest_epoch
1268 ,proc_info
1269 );
1270 }
1271
ndbcluster_log_schema_op(THD * thd,NDB_SHARE * share,const char * query,int query_length,const char * db,const char * table_name,uint32 ndb_table_id,uint32 ndb_table_version,enum SCHEMA_OP_TYPE type,const char * new_db,const char * new_table_name)1272 int ndbcluster_log_schema_op(THD *thd, NDB_SHARE *share,
1273 const char *query, int query_length,
1274 const char *db, const char *table_name,
1275 uint32 ndb_table_id,
1276 uint32 ndb_table_version,
1277 enum SCHEMA_OP_TYPE type,
1278 const char *new_db, const char *new_table_name)
1279 {
1280 DBUG_ENTER("ndbcluster_log_schema_op");
1281 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1282 if (!thd_ndb)
1283 {
1284 if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
1285 {
1286 sql_print_error("Could not allocate Thd_ndb object");
1287 DBUG_RETURN(1);
1288 }
1289 set_thd_ndb(thd, thd_ndb);
1290 }
1291
1292 DBUG_PRINT("enter",
1293 ("query: %s db: %s table_name: %s thd_ndb->options: %d",
1294 query, db, table_name, thd_ndb->options));
1295 if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1296 {
1297 DBUG_RETURN(0);
1298 }
1299
1300 char tmp_buf2[FN_REFLEN];
1301 char quoted_table1[2 + 2 * FN_REFLEN + 1];
1302 char quoted_db1[2 + 2 * FN_REFLEN + 1];
1303 char quoted_db2[2 + 2 * FN_REFLEN + 1];
1304 char quoted_table2[2 + 2 * FN_REFLEN + 1];
1305 int id_length= 0;
1306 const char *type_str;
1307 switch (type)
1308 {
1309 case SOT_DROP_TABLE:
1310 /* drop database command, do not log at drop table */
1311 if (thd->lex->sql_command == SQLCOM_DROP_DB)
1312 DBUG_RETURN(0);
1313 /* redo the drop table query as is may contain several tables */
1314 query= tmp_buf2;
1315 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1316 table_name, 0);
1317 quoted_table1[id_length]= '\0';
1318 query_length= (uint) (strxmov(tmp_buf2, "drop table ",
1319 quoted_table1, NullS) - tmp_buf2);
1320 type_str= "drop table";
1321 break;
1322 case SOT_RENAME_TABLE:
1323 /* redo the rename table query as is may contain several tables */
1324 query= tmp_buf2;
1325 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1326 db, 0);
1327 quoted_db1[id_length]= '\0';
1328 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1329 table_name, 0);
1330 quoted_table1[id_length]= '\0';
1331 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
1332 new_db, 0);
1333 quoted_db2[id_length]= '\0';
1334 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
1335 new_table_name, 0);
1336 quoted_table2[id_length]= '\0';
1337 query_length= (uint) (strxmov(tmp_buf2, "rename table ",
1338 quoted_db1, ".", quoted_table1, " to ",
1339 quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
1340 type_str= "rename table";
1341 break;
1342 case SOT_CREATE_TABLE:
1343 type_str= "create table";
1344 break;
1345 case SOT_ALTER_TABLE:
1346 type_str= "alter table";
1347 break;
1348 case SOT_DROP_DB:
1349 type_str= "drop db";
1350 break;
1351 case SOT_CREATE_DB:
1352 type_str= "create db";
1353 break;
1354 case SOT_ALTER_DB:
1355 type_str= "alter db";
1356 break;
1357 case SOT_TABLESPACE:
1358 type_str= "tablespace";
1359 break;
1360 case SOT_LOGFILE_GROUP:
1361 type_str= "logfile group";
1362 break;
1363 case SOT_TRUNCATE_TABLE:
1364 type_str= "truncate table";
1365 break;
1366 default:
1367 abort(); /* should not happen, programming error */
1368 }
1369
1370 NDB_SCHEMA_OBJECT *ndb_schema_object;
1371 {
1372 char key[FN_REFLEN + 1];
1373 build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1374 ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
1375 }
1376
1377 const NdbError *ndb_error= 0;
1378 uint32 node_id= g_ndb_cluster_connection->node_id();
1379 Uint64 epoch= 0;
1380 MY_BITMAP schema_subscribers;
1381 uint32 bitbuf[sizeof(ndb_schema_object->slock)/4];
1382 char bitbuf_e[sizeof(bitbuf)];
1383 bzero(bitbuf_e, sizeof(bitbuf_e));
1384 {
1385 int i, updated= 0;
1386 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1387 bitmap_init(&schema_subscribers, bitbuf, sizeof(bitbuf)*8, FALSE);
1388 bitmap_set_all(&schema_subscribers);
1389
1390 /* begin protect ndb_schema_share */
1391 mysql_mutex_lock(&ndb_schema_share_mutex);
1392 if (ndb_schema_share == 0)
1393 {
1394 mysql_mutex_unlock(&ndb_schema_share_mutex);
1395 if (ndb_schema_object)
1396 ndb_free_schema_object(&ndb_schema_object, FALSE);
1397 DBUG_RETURN(0);
1398 }
1399 mysql_mutex_lock(&ndb_schema_share->mutex);
1400 for (i= 0; i < no_storage_nodes; i++)
1401 {
1402 MY_BITMAP *table_subscribers= &ndb_schema_share->subscriber_bitmap[i];
1403 if (!bitmap_is_clear_all(table_subscribers))
1404 {
1405 bitmap_intersect(&schema_subscribers,
1406 table_subscribers);
1407 updated= 1;
1408 }
1409 }
1410 mysql_mutex_unlock(&ndb_schema_share->mutex);
1411 mysql_mutex_unlock(&ndb_schema_share_mutex);
1412 /* end protect ndb_schema_share */
1413
1414 if (updated)
1415 {
1416 bitmap_clear_bit(&schema_subscribers, node_id);
1417 /*
1418 if setting own acknowledge bit it is important that
1419 no other mysqld's are registred, as subsequent code
1420 will cause the original event to be hidden (by blob
1421 merge event code)
1422 */
1423 if (bitmap_is_clear_all(&schema_subscribers))
1424 bitmap_set_bit(&schema_subscribers, node_id);
1425 }
1426 else
1427 bitmap_clear_all(&schema_subscribers);
1428
1429 if (ndb_schema_object)
1430 {
1431 mysql_mutex_lock(&ndb_schema_object->mutex);
1432 memcpy(ndb_schema_object->slock, schema_subscribers.bitmap,
1433 sizeof(ndb_schema_object->slock));
1434 mysql_mutex_unlock(&ndb_schema_object->mutex);
1435 }
1436
1437 DBUG_DUMP("schema_subscribers", (uchar*)schema_subscribers.bitmap,
1438 no_bytes_in_map(&schema_subscribers));
1439 DBUG_PRINT("info", ("bitmap_is_clear_all(&schema_subscribers): %d",
1440 bitmap_is_clear_all(&schema_subscribers)));
1441 }
1442
1443 Ndb *ndb= thd_ndb->ndb;
1444 char save_db[FN_REFLEN];
1445 strcpy(save_db, ndb->getDatabaseName());
1446
1447 char tmp_buf[FN_REFLEN];
1448 NDBDICT *dict= ndb->getDictionary();
1449 ndb->setDatabaseName(NDB_REP_DB);
1450 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1451 const NDBTAB *ndbtab= ndbtab_g.get_table();
1452 NdbTransaction *trans= 0;
1453 int retries= 100;
1454 int retry_sleep= 10; /* 10 milliseconds, transaction */
1455 const NDBCOL *col[SCHEMA_SIZE];
1456 unsigned sz[SCHEMA_SIZE];
1457
1458 if (ndbtab == 0)
1459 {
1460 if (strcmp(NDB_REP_DB, db) != 0 ||
1461 strcmp(NDB_SCHEMA_TABLE, table_name))
1462 {
1463 ndb_error= &dict->getNdbError();
1464 }
1465 goto end;
1466 }
1467
1468 {
1469 uint i;
1470 for (i= 0; i < SCHEMA_SIZE; i++)
1471 {
1472 col[i]= ndbtab->getColumn(i);
1473 if (i != SCHEMA_QUERY_I)
1474 {
1475 sz[i]= col[i]->getLength();
1476 DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1477 }
1478 }
1479 }
1480
1481 while (1)
1482 {
1483 const char *log_db= db;
1484 const char *log_tab= table_name;
1485 const char *log_subscribers= (char*)schema_subscribers.bitmap;
1486 uint32 log_type= (uint32)type;
1487 if ((trans= ndb->startTransaction()) == 0)
1488 goto err;
1489 while (1)
1490 {
1491 NdbOperation *op= 0;
1492 int r= 0;
1493 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1494 DBUG_ASSERT(r == 0);
1495 r|= op->writeTuple();
1496 DBUG_ASSERT(r == 0);
1497
1498 /* db */
1499 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
1500 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1501 DBUG_ASSERT(r == 0);
1502 /* name */
1503 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
1504 strlen(log_tab));
1505 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1506 DBUG_ASSERT(r == 0);
1507 /* slock */
1508 DBUG_ASSERT(sz[SCHEMA_SLOCK_I] == sizeof(bitbuf));
1509 r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
1510 DBUG_ASSERT(r == 0);
1511 /* query */
1512 {
1513 NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
1514 DBUG_ASSERT(ndb_blob != 0);
1515 uint blob_len= query_length;
1516 const char* blob_ptr= query;
1517 r|= ndb_blob->setValue(blob_ptr, blob_len);
1518 DBUG_ASSERT(r == 0);
1519 }
1520 /* node_id */
1521 r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1522 DBUG_ASSERT(r == 0);
1523 /* epoch */
1524 r|= op->setValue(SCHEMA_EPOCH_I, epoch);
1525 DBUG_ASSERT(r == 0);
1526 /* id */
1527 r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
1528 DBUG_ASSERT(r == 0);
1529 /* version */
1530 r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
1531 DBUG_ASSERT(r == 0);
1532 /* type */
1533 r|= op->setValue(SCHEMA_TYPE_I, log_type);
1534 DBUG_ASSERT(r == 0);
1535 /* any value */
1536 if (!(thd->variables.option_bits & OPTION_BIN_LOG))
1537 r|= op->setAnyValue(NDB_ANYVALUE_FOR_NOLOGGING);
1538 else
1539 r|= op->setAnyValue(thd->server_id);
1540 DBUG_ASSERT(r == 0);
1541 if (log_db != new_db && new_db && new_table_name)
1542 {
1543 log_db= new_db;
1544 log_tab= new_table_name;
1545 log_subscribers= bitbuf_e; // no ack expected on this
1546 log_type= (uint32)SOT_RENAME_TABLE_NEW;
1547 continue;
1548 }
1549 break;
1550 }
1551 if (trans->execute(NdbTransaction::Commit) == 0)
1552 {
1553 DBUG_PRINT("info", ("logged: %s", query));
1554 break;
1555 }
1556 err:
1557 const NdbError *this_error= trans ?
1558 &trans->getNdbError() : &ndb->getNdbError();
1559 if (this_error->status == NdbError::TemporaryError)
1560 {
1561 if (retries--)
1562 {
1563 if (trans)
1564 ndb->closeTransaction(trans);
1565 my_sleep(retry_sleep);
1566 continue; // retry
1567 }
1568 }
1569 ndb_error= this_error;
1570 break;
1571 }
1572 end:
1573 if (ndb_error)
1574 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
1575 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1576 ndb_error->code,
1577 ndb_error->message,
1578 "Could not log query '%s' on other mysqld's");
1579
1580 if (trans)
1581 ndb->closeTransaction(trans);
1582 ndb->setDatabaseName(save_db);
1583
1584 /*
1585 Wait for other mysqld's to acknowledge the table operation
1586 */
1587 if (ndb_error == 0 &&
1588 !bitmap_is_clear_all(&schema_subscribers))
1589 {
1590 /*
1591 if own nodeid is set we are a single mysqld registred
1592 as an optimization we update the slock directly
1593 */
1594 if (bitmap_is_set(&schema_subscribers, node_id))
1595 ndbcluster_update_slock(thd, db, table_name);
1596 else
1597 dict->forceGCPWait();
1598
1599 int max_timeout= DEFAULT_SYNC_TIMEOUT;
1600 mysql_mutex_lock(&ndb_schema_object->mutex);
1601 while (1)
1602 {
1603 struct timespec abstime;
1604 int i;
1605 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1606 set_timespec(abstime, 1);
1607 int ret= mysql_cond_timedwait(&injector_cond,
1608 &ndb_schema_object->mutex,
1609 &abstime);
1610 if (thd->killed)
1611 break;
1612
1613 /* begin protect ndb_schema_share */
1614 mysql_mutex_lock(&ndb_schema_share_mutex);
1615 if (ndb_schema_share == 0)
1616 {
1617 mysql_mutex_unlock(&ndb_schema_share_mutex);
1618 break;
1619 }
1620 mysql_mutex_lock(&ndb_schema_share->mutex);
1621 for (i= 0; i < no_storage_nodes; i++)
1622 {
1623 /* remove any unsubscribed from schema_subscribers */
1624 MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
1625 if (!bitmap_is_clear_all(tmp))
1626 bitmap_intersect(&schema_subscribers, tmp);
1627 }
1628 mysql_mutex_unlock(&ndb_schema_share->mutex);
1629 mysql_mutex_unlock(&ndb_schema_share_mutex);
1630 /* end protect ndb_schema_share */
1631
1632 /* remove any unsubscribed from ndb_schema_object->slock */
1633 bitmap_intersect(&ndb_schema_object->slock_bitmap, &schema_subscribers);
1634
1635 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
1636 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
1637 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1638
1639 if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
1640 break;
1641
1642 if (ret)
1643 {
1644 max_timeout--;
1645 if (max_timeout == 0)
1646 {
1647 sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
1648 type_str, ndb_schema_object->key);
1649 break;
1650 }
1651 if (opt_ndb_extra_logging)
1652 ndb_report_waiting(type_str, max_timeout,
1653 "distributing", ndb_schema_object->key);
1654 }
1655 }
1656 mysql_mutex_unlock(&ndb_schema_object->mutex);
1657 }
1658
1659 if (ndb_schema_object)
1660 ndb_free_schema_object(&ndb_schema_object, FALSE);
1661
1662 DBUG_RETURN(0);
1663 }
1664
1665 /*
1666 Handle _non_ data events from the storage nodes
1667 */
1668 int
ndb_handle_schema_change(THD * thd,Ndb * ndb,NdbEventOperation * pOp,NDB_SHARE * share)1669 ndb_handle_schema_change(THD *thd, Ndb *ndb, NdbEventOperation *pOp,
1670 NDB_SHARE *share)
1671 {
1672 DBUG_ENTER("ndb_handle_schema_change");
1673 TABLE* table= share->table;
1674 TABLE_SHARE *table_share= share->table_share;
1675 const char *dbname= table_share->db.str;
1676 const char *tabname= table_share->table_name.str;
1677 bool do_close_cached_tables= FALSE;
1678 bool is_online_alter_table= FALSE;
1679 bool is_rename_table= FALSE;
1680 bool is_remote_change=
1681 (uint) pOp->getReqNodeId() != g_ndb_cluster_connection->node_id();
1682
1683 if (pOp->getEventType() == NDBEVENT::TE_ALTER)
1684 {
1685 if (pOp->tableFrmChanged())
1686 {
1687 DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: table frm changed"));
1688 is_online_alter_table= TRUE;
1689 }
1690 else
1691 {
1692 DBUG_PRINT("info", ("NDBEVENT::TE_ALTER: name changed"));
1693 DBUG_ASSERT(pOp->tableNameChanged());
1694 is_rename_table= TRUE;
1695 }
1696 }
1697
1698 {
1699 ndb->setDatabaseName(dbname);
1700 Ndb_table_guard ndbtab_g(ndb->getDictionary(), tabname);
1701 const NDBTAB *ev_tab= pOp->getTable();
1702 const NDBTAB *cache_tab= ndbtab_g.get_table();
1703 if (cache_tab &&
1704 cache_tab->getObjectId() == ev_tab->getObjectId() &&
1705 cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
1706 ndbtab_g.invalidate();
1707 }
1708
1709 /*
1710 Refresh local frm file and dictionary cache if
1711 remote on-line alter table
1712 */
1713 if (is_remote_change && is_online_alter_table)
1714 {
1715 const char *tabname= table_share->table_name.str;
1716 char key[FN_REFLEN + 1];
1717 uchar *data= 0, *pack_data= 0;
1718 size_t length, pack_length;
1719 int error;
1720 NDBDICT *dict= ndb->getDictionary();
1721 const NDBTAB *altered_table= pOp->getTable();
1722
1723 DBUG_PRINT("info", ("Detected frm change of table %s.%s",
1724 dbname, tabname));
1725 build_table_filename(key, FN_LEN - 1, dbname, tabname, NullS, 0);
1726 /*
1727 If the there is no local table shadowing the altered table and
1728 it has an frm that is different than the one on disk then
1729 overwrite it with the new table definition
1730 */
1731 if (!ndbcluster_check_if_local_table(dbname, tabname) &&
1732 readfrm(key, &data, &length) == 0 &&
1733 packfrm(data, length, &pack_data, &pack_length) == 0 &&
1734 cmp_frm(altered_table, pack_data, pack_length))
1735 {
1736 DBUG_DUMP("frm", (uchar*) altered_table->getFrmData(),
1737 altered_table->getFrmLength());
1738 Ndb_table_guard ndbtab_g(dict, tabname);
1739 const NDBTAB *old= ndbtab_g.get_table();
1740 if (!old &&
1741 old->getObjectVersion() != altered_table->getObjectVersion())
1742 dict->putTable(altered_table);
1743
1744 my_free(data);
1745 data= NULL;
1746 if ((error= unpackfrm(&data, &length,
1747 (const uchar*) altered_table->getFrmData())) ||
1748 (error= writefrm(key, data, length)))
1749 {
1750 sql_print_information("NDB: Failed write frm for %s.%s, error %d",
1751 dbname, tabname, error);
1752 }
1753
1754 // copy names as memory will be freed
1755 NdbAutoPtr<char> a1((char *)(dbname= strdup(dbname)));
1756 NdbAutoPtr<char> a2((char *)(tabname= strdup(tabname)));
1757 ndbcluster_binlog_close_table(thd, share);
1758
1759 TABLE_LIST table_list;
1760 bzero((char*) &table_list,sizeof(table_list));
1761 table_list.db= (char *)dbname;
1762 table_list.alias= table_list.table_name= (char *)tabname;
1763 close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
1764
1765 if ((error= ndbcluster_binlog_open_table(thd, share,
1766 table_share, table, 1)))
1767 sql_print_information("NDB: Failed to re-open table %s.%s",
1768 dbname, tabname);
1769
1770 table= share->table;
1771 table_share= share->table_share;
1772 dbname= table_share->db.str;
1773 tabname= table_share->table_name.str;
1774 }
1775 my_free(data);
1776 my_free(pack_data);
1777 }
1778
1779 // If only frm was changed continue replicating
1780 if (is_online_alter_table)
1781 {
1782 /* Signal ha_ndbcluster::alter_table that drop is done */
1783 mysql_cond_signal(&injector_cond);
1784 DBUG_RETURN(0);
1785 }
1786
1787 mysql_mutex_lock(&share->mutex);
1788 if (is_rename_table && !is_remote_change)
1789 {
1790 DBUG_PRINT("info", ("Detected name change of table %s.%s",
1791 share->db, share->table_name));
1792 /* ToDo: remove printout */
1793 if (opt_ndb_extra_logging)
1794 sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
1795 share_prefix, share->table->s->db.str,
1796 share->table->s->table_name.str,
1797 share->key);
1798 {
1799 ndb->setDatabaseName(share->table->s->db.str);
1800 Ndb_table_guard ndbtab_g(ndb->getDictionary(),
1801 share->table->s->table_name.str);
1802 const NDBTAB *ev_tab= pOp->getTable();
1803 const NDBTAB *cache_tab= ndbtab_g.get_table();
1804 if (cache_tab &&
1805 cache_tab->getObjectId() == ev_tab->getObjectId() &&
1806 cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
1807 ndbtab_g.invalidate();
1808 }
1809 /* do the rename of the table in the share */
1810 share->table->s->db.str= share->db;
1811 share->table->s->db.length= strlen(share->db);
1812 share->table->s->table_name.str= share->table_name;
1813 share->table->s->table_name.length= strlen(share->table_name);
1814 }
1815 DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
1816 if (share->op_old == pOp)
1817 share->op_old= 0;
1818 else
1819 share->op= 0;
1820 // either just us or drop table handling as well
1821
1822 /* Signal ha_ndbcluster::delete/rename_table that drop is done */
1823 mysql_mutex_unlock(&share->mutex);
1824 mysql_cond_signal(&injector_cond);
1825
1826 mysql_mutex_lock(&ndbcluster_mutex);
1827 /* ndb_share reference binlog free */
1828 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
1829 share->key, share->use_count));
1830 free_share(&share, TRUE);
1831 if (is_remote_change && share && share->state != NSS_DROPPED)
1832 {
1833 DBUG_PRINT("info", ("remote change"));
1834 share->state= NSS_DROPPED;
1835 if (share->use_count != 1)
1836 {
1837 /* open handler holding reference */
1838 /* wait with freeing create ndb_share to below */
1839 do_close_cached_tables= TRUE;
1840 }
1841 else
1842 {
1843 /* ndb_share reference create free */
1844 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
1845 share->key, share->use_count));
1846 free_share(&share, TRUE);
1847 share= 0;
1848 }
1849 }
1850 else
1851 share= 0;
1852 mysql_mutex_unlock(&ndbcluster_mutex);
1853
1854 pOp->setCustomData(0);
1855
1856 mysql_mutex_lock(&injector_mutex);
1857 ndb->dropEventOperation(pOp);
1858 pOp= 0;
1859 mysql_mutex_unlock(&injector_mutex);
1860
1861 if (do_close_cached_tables)
1862 {
1863 TABLE_LIST table_list;
1864 bzero((char*) &table_list,sizeof(table_list));
1865 table_list.db= (char *)dbname;
1866 table_list.alias= table_list.table_name= (char *)tabname;
1867 close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
1868 /* ndb_share reference create free */
1869 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
1870 share->key, share->use_count));
1871 free_share(&share);
1872 }
1873 DBUG_RETURN(0);
1874 }
1875
ndb_binlog_query(THD * thd,Cluster_schema * schema)1876 static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
1877 {
1878 if (schema->any_value & NDB_ANYVALUE_RESERVED)
1879 {
1880 if (schema->any_value != NDB_ANYVALUE_FOR_NOLOGGING)
1881 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
1882 "query not logged",
1883 schema->any_value);
1884 return;
1885 }
1886 uint32 thd_server_id_save= thd->server_id;
1887 DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
1888 char *thd_db_save= thd->db;
1889 if (schema->any_value == 0)
1890 thd->server_id= ::server_id;
1891 else
1892 thd->server_id= schema->any_value;
1893 thd->db= schema->db;
1894 int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
1895 thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
1896 schema->query_length, FALSE, TRUE,
1897 schema->name[0] == 0 || thd->db[0] == 0,
1898 errcode);
1899 thd->server_id= thd_server_id_save;
1900 thd->db= thd_db_save;
1901 }
1902
1903 static int
ndb_binlog_thread_handle_schema_event(THD * thd,Ndb * ndb,NdbEventOperation * pOp,List<Cluster_schema> * post_epoch_log_list,List<Cluster_schema> * post_epoch_unlock_list,MEM_ROOT * mem_root)1904 ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *ndb,
1905 NdbEventOperation *pOp,
1906 List<Cluster_schema>
1907 *post_epoch_log_list,
1908 List<Cluster_schema>
1909 *post_epoch_unlock_list,
1910 MEM_ROOT *mem_root)
1911 {
1912 DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
1913 NDB_SHARE *tmp_share= (NDB_SHARE *)pOp->getCustomData();
1914 if (tmp_share && ndb_schema_share == tmp_share)
1915 {
1916 NDBEVENT::TableEvent ev_type= pOp->getEventType();
1917 DBUG_PRINT("enter", ("%s.%s ev_type: %d",
1918 tmp_share->db, tmp_share->table_name, ev_type));
1919 if (ev_type == NDBEVENT::TE_UPDATE ||
1920 ev_type == NDBEVENT::TE_INSERT)
1921 {
1922 Cluster_schema *schema= (Cluster_schema *)
1923 sql_alloc(sizeof(Cluster_schema));
1924 MY_BITMAP slock;
1925 bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
1926 uint node_id= g_ndb_cluster_connection->node_id();
1927 {
1928 ndbcluster_get_schema(tmp_share, schema);
1929 schema->any_value= pOp->getAnyValue();
1930 }
1931 enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
1932 DBUG_PRINT("info",
1933 ("%s.%s: log query_length: %d query: '%s' type: %d",
1934 schema->db, schema->name,
1935 schema->query_length, schema->query,
1936 schema_type));
1937 if (schema_type == SOT_CLEAR_SLOCK)
1938 {
1939 /*
1940 handle slock after epoch is completed to ensure that
1941 schema events get inserted in the binlog after any data
1942 events
1943 */
1944 post_epoch_log_list->push_back(schema, mem_root);
1945 DBUG_RETURN(0);
1946 }
1947 if (schema->node_id != node_id)
1948 {
1949 int log_query= 0, post_epoch_unlock= 0;
1950 switch (schema_type)
1951 {
1952 case SOT_DROP_TABLE:
1953 // fall through
1954 case SOT_RENAME_TABLE:
1955 // fall through
1956 case SOT_RENAME_TABLE_NEW:
1957 // fall through
1958 case SOT_ALTER_TABLE:
1959 post_epoch_log_list->push_back(schema, mem_root);
1960 /* acknowledge this query _after_ epoch completion */
1961 post_epoch_unlock= 1;
1962 break;
1963 case SOT_TRUNCATE_TABLE:
1964 {
1965 char key[FN_REFLEN + 1];
1966 build_table_filename(key, sizeof(key) - 1,
1967 schema->db, schema->name, "", 0);
1968 /* ndb_share reference temporary, free below */
1969 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
1970 if (share)
1971 {
1972 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
1973 share->key, share->use_count));
1974 }
1975 // invalidation already handled by binlog thread
1976 if (!share || !share->op)
1977 {
1978 {
1979 injector_ndb->setDatabaseName(schema->db);
1980 Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
1981 schema->name);
1982 ndbtab_g.invalidate();
1983 }
1984 TABLE_LIST table_list;
1985 bzero((char*) &table_list,sizeof(table_list));
1986 table_list.db= schema->db;
1987 table_list.alias= table_list.table_name= schema->name;
1988 close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
1989 }
1990 /* ndb_share reference temporary free */
1991 if (share)
1992 {
1993 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
1994 share->key, share->use_count));
1995 free_share(&share);
1996 }
1997 }
1998 // fall through
1999 case SOT_CREATE_TABLE:
2000 if (ndbcluster_check_if_local_table(schema->db, schema->name))
2001 {
2002 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2003 schema->db, schema->name));
2004 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2005 "binlog schema event '%s' from node %d. ",
2006 schema->db, schema->name, schema->query,
2007 schema->node_id);
2008 }
2009 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2010 {
2011 print_could_not_discover_error(thd, schema);
2012 }
2013 log_query= 1;
2014 break;
2015 case SOT_DROP_DB:
2016 /* Drop the database locally if it only contains ndb tables */
2017 if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
2018 {
2019 const int no_print_error[1]= {0};
2020 run_query(thd, schema->query,
2021 schema->query + schema->query_length,
2022 no_print_error, /* print error */
2023 TRUE); /* don't binlog the query */
2024 /* binlog dropping database after any table operations */
2025 post_epoch_log_list->push_back(schema, mem_root);
2026 /* acknowledge this query _after_ epoch completion */
2027 post_epoch_unlock= 1;
2028 }
2029 else
2030 {
2031 /* Database contained local tables, leave it */
2032 sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
2033 "binlog schema event '%s' from node %d. ",
2034 schema->db, schema->query,
2035 schema->node_id);
2036 log_query= 1;
2037 }
2038 break;
2039 case SOT_CREATE_DB:
2040 /* fall through */
2041 case SOT_ALTER_DB:
2042 {
2043 const int no_print_error[1]= {0};
2044 run_query(thd, schema->query,
2045 schema->query + schema->query_length,
2046 no_print_error, /* print error */
2047 TRUE); /* don't binlog the query */
2048 log_query= 1;
2049 break;
2050 }
2051 case SOT_TABLESPACE:
2052 case SOT_LOGFILE_GROUP:
2053 log_query= 1;
2054 break;
2055 case SOT_CLEAR_SLOCK:
2056 abort();
2057 }
2058 if (log_query && ndb_binlog_running)
2059 ndb_binlog_query(thd, schema);
2060 /* signal that schema operation has been handled */
2061 DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
2062 if (bitmap_is_set(&slock, node_id))
2063 {
2064 if (post_epoch_unlock)
2065 post_epoch_unlock_list->push_back(schema, mem_root);
2066 else
2067 ndbcluster_update_slock(thd, schema->db, schema->name);
2068 }
2069 }
2070 DBUG_RETURN(0);
2071 }
2072 /*
2073 the normal case of UPDATE/INSERT has already been handled
2074 */
2075 switch (ev_type)
2076 {
2077 case NDBEVENT::TE_DELETE:
2078 // skip
2079 break;
2080 case NDBEVENT::TE_CLUSTER_FAILURE:
2081 if (opt_ndb_extra_logging)
2082 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
2083 ndb_schema_share->key, (unsigned) pOp->getGCI());
2084 // fall through
2085 case NDBEVENT::TE_DROP:
2086 if (opt_ndb_extra_logging &&
2087 ndb_binlog_tables_inited && ndb_binlog_running)
2088 sql_print_information("NDB Binlog: ndb tables initially "
2089 "read only on reconnect.");
2090
2091 /* begin protect ndb_schema_share */
2092 mysql_mutex_lock(&ndb_schema_share_mutex);
2093 /* ndb_share reference binlog extra free */
2094 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
2095 ndb_schema_share->key,
2096 ndb_schema_share->use_count));
2097 free_share(&ndb_schema_share);
2098 ndb_schema_share= 0;
2099 ndb_binlog_tables_inited= 0;
2100 mysql_mutex_unlock(&ndb_schema_share_mutex);
2101 /* end protect ndb_schema_share */
2102
2103 close_cached_tables(NULL, NULL, FALSE, LONG_TIMEOUT);
2104 // fall through
2105 case NDBEVENT::TE_ALTER:
2106 ndb_handle_schema_change(thd, ndb, pOp, tmp_share);
2107 break;
2108 case NDBEVENT::TE_NODE_FAILURE:
2109 {
2110 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2111 DBUG_ASSERT(node_id != 0xFF);
2112 mysql_mutex_lock(&tmp_share->mutex);
2113 bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
2114 DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
2115 if (opt_ndb_extra_logging)
2116 {
2117 sql_print_information("NDB Binlog: Node: %d, down,"
2118 " Subscriber bitmask %x%x",
2119 pOp->getNdbdNodeId(),
2120 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2121 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2122 }
2123 mysql_mutex_unlock(&tmp_share->mutex);
2124 mysql_cond_signal(&injector_cond);
2125 break;
2126 }
2127 case NDBEVENT::TE_SUBSCRIBE:
2128 {
2129 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2130 uint8 req_id= pOp->getReqNodeId();
2131 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2132 mysql_mutex_lock(&tmp_share->mutex);
2133 bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2134 DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
2135 if (opt_ndb_extra_logging)
2136 {
2137 sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
2138 " Subscriber bitmask %x%x",
2139 pOp->getNdbdNodeId(),
2140 req_id,
2141 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2142 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2143 }
2144 mysql_mutex_unlock(&tmp_share->mutex);
2145 mysql_cond_signal(&injector_cond);
2146 break;
2147 }
2148 case NDBEVENT::TE_UNSUBSCRIBE:
2149 {
2150 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2151 uint8 req_id= pOp->getReqNodeId();
2152 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2153 mysql_mutex_lock(&tmp_share->mutex);
2154 bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2155 DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
2156 if (opt_ndb_extra_logging)
2157 {
2158 sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
2159 " Subscriber bitmask %x%x",
2160 pOp->getNdbdNodeId(),
2161 req_id,
2162 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2163 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2164 }
2165 mysql_mutex_unlock(&tmp_share->mutex);
2166 mysql_cond_signal(&injector_cond);
2167 break;
2168 }
2169 default:
2170 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
2171 "Ignoring...", (unsigned) ev_type, tmp_share->key);
2172 }
2173 }
2174 DBUG_RETURN(0);
2175 }
2176
2177 /*
2178 process any operations that should be done after
2179 the epoch is complete
2180 */
2181 static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD * thd,List<Cluster_schema> * post_epoch_log_list,List<Cluster_schema> * post_epoch_unlock_list)2182 ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
2183 List<Cluster_schema>
2184 *post_epoch_log_list,
2185 List<Cluster_schema>
2186 *post_epoch_unlock_list)
2187 {
2188 if (post_epoch_log_list->elements == 0)
2189 return;
2190 DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
2191 Cluster_schema *schema;
2192 while ((schema= post_epoch_log_list->pop()))
2193 {
2194 DBUG_PRINT("info",
2195 ("%s.%s: log query_length: %d query: '%s' type: %d",
2196 schema->db, schema->name,
2197 schema->query_length, schema->query,
2198 schema->type));
2199 int log_query= 0;
2200 {
2201 enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2202 char key[FN_REFLEN + 1];
2203 build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2204 if (schema_type == SOT_CLEAR_SLOCK)
2205 {
2206 mysql_mutex_lock(&ndbcluster_mutex);
2207 NDB_SCHEMA_OBJECT *ndb_schema_object=
2208 (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
2209 (uchar*) key, strlen(key));
2210 if (ndb_schema_object)
2211 {
2212 mysql_mutex_lock(&ndb_schema_object->mutex);
2213 memcpy(ndb_schema_object->slock, schema->slock,
2214 sizeof(ndb_schema_object->slock));
2215 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
2216 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
2217 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2218 mysql_mutex_unlock(&ndb_schema_object->mutex);
2219 mysql_cond_signal(&injector_cond);
2220 }
2221 mysql_mutex_unlock(&ndbcluster_mutex);
2222 continue;
2223 }
2224 /* ndb_share reference temporary, free below */
2225 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2226 if (share)
2227 {
2228 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2229 share->key, share->use_count));
2230 }
2231 switch (schema_type)
2232 {
2233 case SOT_DROP_DB:
2234 log_query= 1;
2235 break;
2236 case SOT_DROP_TABLE:
2237 log_query= 1;
2238 // invalidation already handled by binlog thread
2239 if (share && share->op)
2240 {
2241 break;
2242 }
2243 // fall through
2244 case SOT_RENAME_TABLE:
2245 // fall through
2246 case SOT_ALTER_TABLE:
2247 // invalidation already handled by binlog thread
2248 if (!share || !share->op)
2249 {
2250 {
2251 injector_ndb->setDatabaseName(schema->db);
2252 Ndb_table_guard ndbtab_g(injector_ndb->getDictionary(),
2253 schema->name);
2254 ndbtab_g.invalidate();
2255 }
2256 TABLE_LIST table_list;
2257 bzero((char*) &table_list,sizeof(table_list));
2258 table_list.db= schema->db;
2259 table_list.alias= table_list.table_name= schema->name;
2260 close_cached_tables(thd, &table_list, FALSE, LONG_TIMEOUT);
2261 }
2262 if (schema_type != SOT_ALTER_TABLE)
2263 break;
2264 // fall through
2265 case SOT_RENAME_TABLE_NEW:
2266 log_query= 1;
2267 if (ndb_binlog_running && (!share || !share->op))
2268 {
2269 /*
2270 we need to free any share here as command below
2271 may need to call handle_trailing_share
2272 */
2273 if (share)
2274 {
2275 /* ndb_share reference temporary free */
2276 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2277 share->key, share->use_count));
2278 free_share(&share);
2279 share= 0;
2280 }
2281 if (ndbcluster_check_if_local_table(schema->db, schema->name))
2282 {
2283 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2284 schema->db, schema->name));
2285 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2286 "binlog schema event '%s' from node %d. ",
2287 schema->db, schema->name, schema->query,
2288 schema->node_id);
2289 }
2290 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2291 {
2292 print_could_not_discover_error(thd, schema);
2293 }
2294 }
2295 break;
2296 default:
2297 DBUG_ASSERT(FALSE);
2298 }
2299 if (share)
2300 {
2301 /* ndb_share reference temporary free */
2302 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2303 share->key, share->use_count));
2304 free_share(&share);
2305 share= 0;
2306 }
2307 }
2308 if (ndb_binlog_running && log_query)
2309 ndb_binlog_query(thd, schema);
2310 }
2311 while ((schema= post_epoch_unlock_list->pop()))
2312 {
2313 ndbcluster_update_slock(thd, schema->db, schema->name);
2314 }
2315 DBUG_VOID_RETURN;
2316 }
2317
2318 /*
2319 Timer class for doing performance measurements
2320 */
2321
2322 /*********************************************************************
2323 Internal helper functions for handeling of the cluster replication tables
2324 - ndb_binlog_index
2325 - ndb_apply_status
2326 *********************************************************************/
2327
2328 /*
2329 struct to hold the data to be inserted into the
2330 ndb_binlog_index table
2331 */
2332 struct ndb_binlog_index_row {
2333 ulonglong gci;
2334 const char *master_log_file;
2335 ulonglong master_log_pos;
2336 ulonglong n_inserts;
2337 ulonglong n_updates;
2338 ulonglong n_deletes;
2339 ulonglong n_schemaops;
2340 };
2341
2342 /*
2343 Open the ndb_binlog_index table
2344 */
open_ndb_binlog_index(THD * thd,TABLE ** ndb_binlog_index)2345 static int open_ndb_binlog_index(THD *thd, TABLE **ndb_binlog_index)
2346 {
2347 static char repdb[]= NDB_REP_DB;
2348 static char reptable[]= NDB_REP_TABLE;
2349 const char *save_proc_info= thd->proc_info;
2350 TABLE_LIST *tables= &binlog_tables;
2351
2352 tables->init_one_table(repdb, strlen(repdb), reptable, strlen(reptable),
2353 reptable, TL_WRITE);
2354 thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
2355
2356 tables->required_type= FRMTYPE_TABLE;
2357 thd->clear_error();
2358 if (open_and_lock_tables(thd, tables, FALSE, 0))
2359 {
2360 if (thd->killed)
2361 sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
2362 else
2363 sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
2364 thd->stmt_da->sql_errno(),
2365 thd->stmt_da->message());
2366 thd->proc_info= save_proc_info;
2367 return -1;
2368 }
2369 *ndb_binlog_index= tables->table;
2370 thd->proc_info= save_proc_info;
2371 (*ndb_binlog_index)->use_all_columns();
2372 return 0;
2373 }
2374
2375
2376 /*
2377 Insert one row in the ndb_binlog_index
2378 */
2379
ndb_add_ndb_binlog_index(THD * thd,void * _row)2380 int ndb_add_ndb_binlog_index(THD *thd, void *_row)
2381 {
2382 ndb_binlog_index_row &row= *(ndb_binlog_index_row *) _row;
2383 int error= 0;
2384 /*
2385 Turn of binlogging to prevent the table changes to be written to
2386 the binary log.
2387 */
2388 ulong saved_options= thd->variables.option_bits;
2389 thd->variables.option_bits&= ~OPTION_BIN_LOG;
2390
2391 if (!ndb_binlog_index && open_ndb_binlog_index(thd, &ndb_binlog_index))
2392 {
2393 sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
2394 error= -1;
2395 goto add_ndb_binlog_index_err;
2396 }
2397
2398 /*
2399 Intialize ndb_binlog_index->record[0]
2400 */
2401 empty_record(ndb_binlog_index);
2402
2403 ndb_binlog_index->field[0]->store(row.master_log_pos);
2404 ndb_binlog_index->field[1]->store(row.master_log_file,
2405 strlen(row.master_log_file),
2406 &my_charset_bin);
2407 ndb_binlog_index->field[2]->store(row.gci);
2408 ndb_binlog_index->field[3]->store(row.n_inserts);
2409 ndb_binlog_index->field[4]->store(row.n_updates);
2410 ndb_binlog_index->field[5]->store(row.n_deletes);
2411 ndb_binlog_index->field[6]->store(row.n_schemaops);
2412
2413 if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
2414 {
2415 sql_print_error("NDB Binlog: Writing row to ndb_binlog_index: %d", error);
2416 error= -1;
2417 goto add_ndb_binlog_index_err;
2418 }
2419
2420 add_ndb_binlog_index_err:
2421 thd->stmt_da->can_overwrite_status= TRUE;
2422 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
2423 thd->stmt_da->can_overwrite_status= FALSE;
2424 close_thread_tables(thd);
2425 /*
2426 There should be no need for rolling back transaction due to deadlock
2427 (since ndb_binlog_index is non transactional).
2428 */
2429 DBUG_ASSERT(! thd->transaction_rollback_request);
2430
2431 thd->mdl_context.release_transactional_locks();
2432 ndb_binlog_index= 0;
2433 thd->variables.option_bits= saved_options;
2434 return error;
2435 }
2436
2437 /*********************************************************************
2438 Functions for start, stop, wait for ndbcluster binlog thread
2439 *********************************************************************/
2440
2441 enum Binlog_thread_state
2442 {
2443 BCCC_running= 0,
2444 BCCC_exit= 1,
2445 BCCC_restart= 2
2446 };
2447
2448 static enum Binlog_thread_state do_ndbcluster_binlog_close_connection= BCCC_restart;
2449
ndbcluster_binlog_start()2450 int ndbcluster_binlog_start()
2451 {
2452 DBUG_ENTER("ndbcluster_binlog_start");
2453
2454 if (::server_id == 0)
2455 {
2456 sql_print_warning("NDB: server id set to zero will cause any other mysqld "
2457 "with bin log to log with wrong server id");
2458 }
2459 else if (::server_id & 0x1 << 31)
2460 {
2461 sql_print_error("NDB: server id's with high bit set is reserved for internal "
2462 "purposes");
2463 DBUG_RETURN(-1);
2464 }
2465
2466 mysql_mutex_init(key_injector_mutex, &injector_mutex, MY_MUTEX_INIT_FAST);
2467 mysql_cond_init(key_injector_cond, &injector_cond, NULL);
2468 mysql_mutex_init(key_ndb_schema_share_mutex,
2469 &ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
2470
2471 /* Create injector thread */
2472 if (mysql_thread_create(key_thread_ndb_binlog,
2473 &ndb_binlog_thread, &connection_attrib,
2474 ndb_binlog_thread_func, 0))
2475 {
2476 DBUG_PRINT("error", ("Could not create ndb injector thread"));
2477 mysql_cond_destroy(&injector_cond);
2478 mysql_mutex_destroy(&injector_mutex);
2479 DBUG_RETURN(-1);
2480 }
2481
2482 ndbcluster_binlog_inited= 1;
2483
2484 /* Wait for the injector thread to start */
2485 mysql_mutex_lock(&injector_mutex);
2486 while (!ndb_binlog_thread_running)
2487 mysql_cond_wait(&injector_cond, &injector_mutex);
2488 mysql_mutex_unlock(&injector_mutex);
2489
2490 if (ndb_binlog_thread_running < 0)
2491 DBUG_RETURN(-1);
2492
2493 DBUG_RETURN(0);
2494 }
2495
2496
2497 /**************************************************************
2498 Internal helper functions for creating/dropping ndb events
2499 used by the client sql threads
2500 **************************************************************/
2501 void
ndb_rep_event_name(String * event_name,const char * db,const char * tbl)2502 ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
2503 {
2504 event_name->set_ascii("REPL$", 5);
2505 event_name->append(db);
2506 if (tbl)
2507 {
2508 event_name->append('/');
2509 event_name->append(tbl);
2510 }
2511 }
2512
2513 bool
ndbcluster_check_if_local_table(const char * dbname,const char * tabname)2514 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
2515 {
2516 char key[FN_REFLEN + 1];
2517 char ndb_file[FN_REFLEN + 1];
2518
2519 DBUG_ENTER("ndbcluster_check_if_local_table");
2520 build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
2521 build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
2522 /* Check that any defined table is an ndb table */
2523 DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
2524 if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
2525 {
2526 DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
2527
2528
2529 DBUG_RETURN(true);
2530 }
2531
2532 DBUG_RETURN(false);
2533 }
2534
2535 bool
ndbcluster_check_if_local_tables_in_db(THD * thd,const char * dbname)2536 ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
2537 {
2538 DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
2539 DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
2540 LEX_STRING *tabname;
2541 List<LEX_STRING> files;
2542 char path[FN_REFLEN + 1];
2543
2544 build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
2545 if (find_files(thd, &files, dbname, path, NullS, 0, NULL) !=
2546 FIND_FILES_OK)
2547 {
2548 DBUG_PRINT("info", ("Failed to find files"));
2549 DBUG_RETURN(true);
2550 }
2551 DBUG_PRINT("info",("found: %d files", files.elements));
2552 while ((tabname= files.pop()))
2553 {
2554 DBUG_PRINT("info", ("Found table %s", tabname->str));
2555 if (ndbcluster_check_if_local_table(dbname, tabname->str))
2556 DBUG_RETURN(true);
2557 }
2558
2559 DBUG_RETURN(false);
2560 }
2561
2562 /*
2563 Common function for setting up everything for logging a table at
2564 create/discover.
2565 */
ndbcluster_create_binlog_setup(Ndb * ndb,const char * key,uint key_len,const char * db,const char * table_name,my_bool share_may_exist)2566 int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
2567 uint key_len,
2568 const char *db,
2569 const char *table_name,
2570 my_bool share_may_exist)
2571 {
2572 int do_event_op= ndb_binlog_running;
2573 DBUG_ENTER("ndbcluster_create_binlog_setup");
2574 DBUG_PRINT("enter",("key: %s key_len: %d %s.%s share_may_exist: %d",
2575 key, key_len, db, table_name, share_may_exist));
2576 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
2577 DBUG_ASSERT(strlen(key) == key_len);
2578
2579 mysql_mutex_lock(&ndbcluster_mutex);
2580
2581 /* Handle any trailing share */
2582 NDB_SHARE *share= (NDB_SHARE*) my_hash_search(&ndbcluster_open_tables,
2583 (uchar*) key, key_len);
2584
2585 if (share && share_may_exist)
2586 {
2587 if (share->flags & NSF_NO_BINLOG ||
2588 share->op != 0 ||
2589 share->op_old != 0)
2590 {
2591 mysql_mutex_unlock(&ndbcluster_mutex);
2592 DBUG_RETURN(0); // replication already setup, or should not
2593 }
2594 }
2595
2596 if (share)
2597 {
2598 if (share->op || share->op_old)
2599 {
2600 my_errno= HA_ERR_TABLE_EXIST;
2601 mysql_mutex_unlock(&ndbcluster_mutex);
2602 DBUG_RETURN(1);
2603 }
2604 if (!share_may_exist || share->connect_count !=
2605 g_ndb_cluster_connection->get_connect_count())
2606 {
2607 handle_trailing_share(share);
2608 share= NULL;
2609 }
2610 }
2611
2612 /* Create share which is needed to hold replication information */
2613 if (share)
2614 {
2615 /* ndb_share reference create */
2616 ++share->use_count;
2617 DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
2618 share->key, share->use_count));
2619 }
2620 /* ndb_share reference create */
2621 else if (!(share= get_share(key, 0, TRUE, TRUE)))
2622 {
2623 sql_print_error("NDB Binlog: "
2624 "allocating table share for %s failed", key);
2625 }
2626 else
2627 {
2628 DBUG_PRINT("NDB_SHARE", ("%s create use_count: %u",
2629 share->key, share->use_count));
2630 }
2631
2632 if (!ndb_schema_share &&
2633 strcmp(share->db, NDB_REP_DB) == 0 &&
2634 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
2635 do_event_op= 1;
2636 else if (!ndb_apply_status_share &&
2637 strcmp(share->db, NDB_REP_DB) == 0 &&
2638 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
2639 do_event_op= 1;
2640
2641 if (!do_event_op)
2642 {
2643 share->flags|= NSF_NO_BINLOG;
2644 mysql_mutex_unlock(&ndbcluster_mutex);
2645 DBUG_RETURN(0);
2646 }
2647 mysql_mutex_unlock(&ndbcluster_mutex);
2648
2649 while (share && !IS_TMP_PREFIX(table_name))
2650 {
2651 /*
2652 ToDo make sanity check of share so that the table is actually the same
2653 I.e. we need to do open file from frm in this case
2654 Currently awaiting this to be fixed in the 4.1 tree in the general
2655 case
2656 */
2657
2658 /* Create the event in NDB */
2659 ndb->setDatabaseName(db);
2660
2661 NDBDICT *dict= ndb->getDictionary();
2662 Ndb_table_guard ndbtab_g(dict, table_name);
2663 const NDBTAB *ndbtab= ndbtab_g.get_table();
2664 if (ndbtab == 0)
2665 {
2666 if (opt_ndb_extra_logging)
2667 sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
2668 "%s, %d", key, dict->getNdbError().message,
2669 dict->getNdbError().code);
2670 break; // error
2671 }
2672 String event_name(INJECTOR_EVENT_LEN);
2673 ndb_rep_event_name(&event_name, db, table_name);
2674 /*
2675 event should have been created by someone else,
2676 but let's make sure, and create if it doesn't exist
2677 */
2678 const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
2679 if (!ev)
2680 {
2681 if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
2682 {
2683 sql_print_error("NDB Binlog: "
2684 "FAILED CREATE (DISCOVER) TABLE Event: %s",
2685 event_name.c_ptr());
2686 break; // error
2687 }
2688 if (opt_ndb_extra_logging)
2689 sql_print_information("NDB Binlog: "
2690 "CREATE (DISCOVER) TABLE Event: %s",
2691 event_name.c_ptr());
2692 }
2693 else
2694 {
2695 delete ev;
2696 if (opt_ndb_extra_logging)
2697 sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
2698 event_name.c_ptr());
2699 }
2700
2701 /*
2702 create the event operations for receiving logging events
2703 */
2704 if (ndbcluster_create_event_ops(share, ndbtab, event_name.c_ptr()))
2705 {
2706 sql_print_error("NDB Binlog:"
2707 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
2708 event_name.c_ptr());
2709 /* a warning has been issued to the client */
2710 DBUG_RETURN(0);
2711 }
2712 DBUG_RETURN(0);
2713 }
2714 DBUG_RETURN(-1);
2715 }
2716
2717 int
ndbcluster_create_event(Ndb * ndb,const NDBTAB * ndbtab,const char * event_name,NDB_SHARE * share,int push_warning)2718 ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
2719 const char *event_name, NDB_SHARE *share,
2720 int push_warning)
2721 {
2722 THD *thd= current_thd;
2723 DBUG_ENTER("ndbcluster_create_event");
2724 DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
2725 ndbtab->getName(), ndbtab->getObjectVersion(),
2726 event_name, share ? share->key : "(nil)"));
2727 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
2728 if (!share)
2729 {
2730 DBUG_PRINT("info", ("share == NULL"));
2731 DBUG_RETURN(0);
2732 }
2733 if (share->flags & NSF_NO_BINLOG)
2734 {
2735 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
2736 share->flags, share->flags & NSF_NO_BINLOG));
2737 DBUG_RETURN(0);
2738 }
2739
2740 NDBDICT *dict= ndb->getDictionary();
2741 NDBEVENT my_event(event_name);
2742 my_event.setTable(*ndbtab);
2743 my_event.addTableEvent(NDBEVENT::TE_ALL);
2744 if (share->flags & NSF_HIDDEN_PK)
2745 {
2746 if (share->flags & NSF_BLOB_FLAG)
2747 {
2748 sql_print_error("NDB Binlog: logging of table %s "
2749 "with BLOB attribute and no PK is not supported",
2750 share->key);
2751 if (push_warning)
2752 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2753 ER_ILLEGAL_HA_CREATE_OPTION,
2754 ER(ER_ILLEGAL_HA_CREATE_OPTION),
2755 ndbcluster_hton_name,
2756 "Binlog of table with BLOB attribute and no PK");
2757
2758 share->flags|= NSF_NO_BINLOG;
2759 DBUG_RETURN(-1);
2760 }
2761 /* No primary key, subscribe for all attributes */
2762 my_event.setReport(NDBEVENT::ER_ALL);
2763 DBUG_PRINT("info", ("subscription all"));
2764 }
2765 else
2766 {
2767 if (ndb_schema_share || strcmp(share->db, NDB_REP_DB) ||
2768 strcmp(share->table_name, NDB_SCHEMA_TABLE))
2769 {
2770 my_event.setReport(NDBEVENT::ER_UPDATED);
2771 DBUG_PRINT("info", ("subscription only updated"));
2772 }
2773 else
2774 {
2775 my_event.setReport((NDBEVENT::EventReport)
2776 (NDBEVENT::ER_ALL | NDBEVENT::ER_SUBSCRIBE));
2777 DBUG_PRINT("info", ("subscription all and subscribe"));
2778 }
2779 }
2780 if (share->flags & NSF_BLOB_FLAG)
2781 my_event.mergeEvents(TRUE);
2782
2783 /* add all columns to the event */
2784 int n_cols= ndbtab->getNoOfColumns();
2785 for(int a= 0; a < n_cols; a++)
2786 my_event.addEventColumn(a);
2787
2788 if (dict->createEvent(my_event)) // Add event to database
2789 {
2790 if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
2791 {
2792 /*
2793 failed, print a warning
2794 */
2795 if (push_warning > 1)
2796 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2797 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2798 dict->getNdbError().code,
2799 dict->getNdbError().message, "NDB");
2800 sql_print_error("NDB Binlog: Unable to create event in database. "
2801 "Event: %s Error Code: %d Message: %s", event_name,
2802 dict->getNdbError().code, dict->getNdbError().message);
2803 DBUG_RETURN(-1);
2804 }
2805
2806 /*
2807 try retrieving the event, if table version/id matches, we will get
2808 a valid event. Otherwise we have a trailing event from before
2809 */
2810 const NDBEVENT *ev;
2811 if ((ev= dict->getEvent(event_name)))
2812 {
2813 delete ev;
2814 DBUG_RETURN(0);
2815 }
2816
2817 /*
2818 trailing event from before; an error, but try to correct it
2819 */
2820 if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
2821 dict->dropEvent(my_event.getName()))
2822 {
2823 if (push_warning > 1)
2824 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2825 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2826 dict->getNdbError().code,
2827 dict->getNdbError().message, "NDB");
2828 sql_print_error("NDB Binlog: Unable to create event in database. "
2829 " Attempt to correct with drop failed. "
2830 "Event: %s Error Code: %d Message: %s",
2831 event_name,
2832 dict->getNdbError().code,
2833 dict->getNdbError().message);
2834 DBUG_RETURN(-1);
2835 }
2836
2837 /*
2838 try to add the event again
2839 */
2840 if (dict->createEvent(my_event))
2841 {
2842 if (push_warning > 1)
2843 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2844 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2845 dict->getNdbError().code,
2846 dict->getNdbError().message, "NDB");
2847 sql_print_error("NDB Binlog: Unable to create event in database. "
2848 " Attempt to correct with drop ok, but create failed. "
2849 "Event: %s Error Code: %d Message: %s",
2850 event_name,
2851 dict->getNdbError().code,
2852 dict->getNdbError().message);
2853 DBUG_RETURN(-1);
2854 }
2855 #ifdef NDB_BINLOG_EXTRA_WARNINGS
2856 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2857 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2858 0, "NDB Binlog: Removed trailing event",
2859 "NDB");
2860 #endif
2861 }
2862
2863 DBUG_RETURN(0);
2864 }
2865
is_ndb_compatible_type(Field * field)2866 inline int is_ndb_compatible_type(Field *field)
2867 {
2868 return
2869 !(field->flags & BLOB_FLAG) &&
2870 field->type() != MYSQL_TYPE_BIT &&
2871 field->pack_length() != 0;
2872 }
2873
2874 /*
2875 - create eventOperations for receiving log events
2876 - setup ndb recattrs for reception of log event data
2877 - "start" the event operation
2878
2879 used at create/discover of tables
2880 */
2881 int
ndbcluster_create_event_ops(NDB_SHARE * share,const NDBTAB * ndbtab,const char * event_name)2882 ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
2883 const char *event_name)
2884 {
2885 THD *thd= current_thd;
2886 /*
2887 we are in either create table or rename table so table should be
2888 locked, hence we can work with the share without locks
2889 */
2890
2891 DBUG_ENTER("ndbcluster_create_event_ops");
2892 DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
2893 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
2894
2895 DBUG_ASSERT(share != 0);
2896
2897 if (share->flags & NSF_NO_BINLOG)
2898 {
2899 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
2900 share->flags));
2901 DBUG_RETURN(0);
2902 }
2903
2904 int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
2905 if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2906 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
2907 do_ndb_schema_share= 1;
2908 else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
2909 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
2910 do_ndb_apply_status_share= 1;
2911 else if (!binlog_filter->db_ok(share->db) || !ndb_binlog_running)
2912 {
2913 share->flags|= NSF_NO_BINLOG;
2914 DBUG_RETURN(0);
2915 }
2916
2917 if (share->op)
2918 {
2919 assert(share->op->getCustomData() == (void *) share);
2920
2921 DBUG_ASSERT(share->use_count > 1);
2922 sql_print_error("NDB Binlog: discover reusing old ev op");
2923 /* ndb_share reference ToDo free */
2924 DBUG_PRINT("NDB_SHARE", ("%s ToDo free use_count: %u",
2925 share->key, share->use_count));
2926 free_share(&share); // old event op already has reference
2927 DBUG_RETURN(0);
2928 }
2929
2930 TABLE *table= share->table;
2931
2932 int retries= 100;
2933 /*
2934 100 milliseconds, temporary error on schema operation can
2935 take some time to be resolved
2936 */
2937 int retry_sleep= 100;
2938 while (1)
2939 {
2940 mysql_mutex_lock(&injector_mutex);
2941 Ndb *ndb= injector_ndb;
2942 if (do_ndb_schema_share)
2943 ndb= schema_ndb;
2944
2945 if (ndb == 0)
2946 {
2947 mysql_mutex_unlock(&injector_mutex);
2948 DBUG_RETURN(-1);
2949 }
2950
2951 NdbEventOperation* op;
2952 if (do_ndb_schema_share)
2953 op= ndb->createEventOperation(event_name);
2954 else
2955 {
2956 // set injector_ndb database/schema from table internal name
2957 int ret= ndb->setDatabaseAndSchemaName(ndbtab);
2958 assert(ret == 0);
2959 op= ndb->createEventOperation(event_name);
2960 // reset to catch errors
2961 ndb->setDatabaseName("");
2962 }
2963 if (!op)
2964 {
2965 sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
2966 " %s",event_name);
2967 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
2968 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2969 ndb->getNdbError().code,
2970 ndb->getNdbError().message,
2971 "NDB");
2972 mysql_mutex_unlock(&injector_mutex);
2973 DBUG_RETURN(-1);
2974 }
2975
2976 if (share->flags & NSF_BLOB_FLAG)
2977 op->mergeEvents(TRUE); // currently not inherited from event
2978
2979 DBUG_PRINT("info", ("share->ndb_value[0]: 0x%lx share->ndb_value[1]: 0x%lx",
2980 (long) share->ndb_value[0],
2981 (long) share->ndb_value[1]));
2982 int n_columns= ndbtab->getNoOfColumns();
2983 int n_fields= table ? table->s->fields : 0; // XXX ???
2984 for (int j= 0; j < n_columns; j++)
2985 {
2986 const char *col_name= ndbtab->getColumn(j)->getName();
2987 NdbValue attr0, attr1;
2988 if (j < n_fields)
2989 {
2990 Field *f= share->table->field[j];
2991 if (is_ndb_compatible_type(f))
2992 {
2993 DBUG_PRINT("info", ("%s compatible", col_name));
2994 attr0.rec= op->getValue(col_name, (char*) f->ptr);
2995 attr1.rec= op->getPreValue(col_name,
2996 (f->ptr - share->table->record[0]) +
2997 (char*) share->table->record[1]);
2998 }
2999 else if (! (f->flags & BLOB_FLAG))
3000 {
3001 DBUG_PRINT("info", ("%s non compatible", col_name));
3002 attr0.rec= op->getValue(col_name);
3003 attr1.rec= op->getPreValue(col_name);
3004 }
3005 else
3006 {
3007 DBUG_PRINT("info", ("%s blob", col_name));
3008 DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
3009 attr0.blob= op->getBlobHandle(col_name);
3010 attr1.blob= op->getPreBlobHandle(col_name);
3011 if (attr0.blob == NULL || attr1.blob == NULL)
3012 {
3013 sql_print_error("NDB Binlog: Creating NdbEventOperation"
3014 " blob field %u handles failed (code=%d) for %s",
3015 j, op->getNdbError().code, event_name);
3016 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
3017 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3018 op->getNdbError().code,
3019 op->getNdbError().message,
3020 "NDB");
3021 ndb->dropEventOperation(op);
3022 mysql_mutex_unlock(&injector_mutex);
3023 DBUG_RETURN(-1);
3024 }
3025 }
3026 }
3027 else
3028 {
3029 DBUG_PRINT("info", ("%s hidden key", col_name));
3030 attr0.rec= op->getValue(col_name);
3031 attr1.rec= op->getPreValue(col_name);
3032 }
3033 share->ndb_value[0][j].ptr= attr0.ptr;
3034 share->ndb_value[1][j].ptr= attr1.ptr;
3035 DBUG_PRINT("info", ("&share->ndb_value[0][%d]: 0x%lx "
3036 "share->ndb_value[0][%d]: 0x%lx",
3037 j, (long) &share->ndb_value[0][j],
3038 j, (long) attr0.ptr));
3039 DBUG_PRINT("info", ("&share->ndb_value[1][%d]: 0x%lx "
3040 "share->ndb_value[1][%d]: 0x%lx",
3041 j, (long) &share->ndb_value[0][j],
3042 j, (long) attr1.ptr));
3043 }
3044 op->setCustomData((void *) share); // set before execute
3045 share->op= op; // assign op in NDB_SHARE
3046 if (op->execute())
3047 {
3048 share->op= NULL;
3049 retries--;
3050 if (op->getNdbError().status != NdbError::TemporaryError &&
3051 op->getNdbError().code != 1407)
3052 retries= 0;
3053 if (retries == 0)
3054 {
3055 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
3056 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3057 op->getNdbError().code, op->getNdbError().message,
3058 "NDB");
3059 sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
3060 event_name,
3061 op->getNdbError().code, op->getNdbError().message);
3062 }
3063 ndb->dropEventOperation(op);
3064 mysql_mutex_unlock(&injector_mutex);
3065 if (retries)
3066 {
3067 my_sleep(retry_sleep);
3068 continue;
3069 }
3070 DBUG_RETURN(-1);
3071 }
3072 mysql_mutex_unlock(&injector_mutex);
3073 break;
3074 }
3075
3076 /* ndb_share reference binlog */
3077 get_share(share);
3078 DBUG_PRINT("NDB_SHARE", ("%s binlog use_count: %u",
3079 share->key, share->use_count));
3080 if (do_ndb_apply_status_share)
3081 {
3082 /* ndb_share reference binlog extra */
3083 ndb_apply_status_share= get_share(share);
3084 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
3085 share->key, share->use_count));
3086 mysql_cond_signal(&injector_cond);
3087 }
3088 else if (do_ndb_schema_share)
3089 {
3090 /* ndb_share reference binlog extra */
3091 ndb_schema_share= get_share(share);
3092 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
3093 share->key, share->use_count));
3094 mysql_cond_signal(&injector_cond);
3095 }
3096
3097 DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
3098 share->key, (long) share->op, share->use_count));
3099
3100 if (opt_ndb_extra_logging)
3101 sql_print_information("NDB Binlog: logging %s", share->key);
3102 DBUG_RETURN(0);
3103 }
3104
3105 /*
3106 when entering the calling thread should have a share lock id share != 0
3107 then the injector thread will have one as well, i.e. share->use_count == 0
3108 (unless it has already dropped... then share->op == 0)
3109 */
3110 int
ndbcluster_handle_drop_table(Ndb * ndb,const char * event_name,NDB_SHARE * share,const char * type_str)3111 ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
3112 NDB_SHARE *share, const char *type_str)
3113 {
3114 DBUG_ENTER("ndbcluster_handle_drop_table");
3115 THD *thd= current_thd;
3116
3117 NDBDICT *dict= ndb->getDictionary();
3118 if (event_name && dict->dropEvent(event_name))
3119 {
3120 if (dict->getNdbError().code != 4710)
3121 {
3122 /* drop event failed for some reason, issue a warning */
3123 push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
3124 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
3125 dict->getNdbError().code,
3126 dict->getNdbError().message, "NDB");
3127 /* error is not that the event did not exist */
3128 sql_print_error("NDB Binlog: Unable to drop event in database. "
3129 "Event: %s Error Code: %d Message: %s",
3130 event_name,
3131 dict->getNdbError().code,
3132 dict->getNdbError().message);
3133 /* ToDo; handle error? */
3134 if (share && share->op &&
3135 share->op->getState() == NdbEventOperation::EO_EXECUTING &&
3136 dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
3137 {
3138 DBUG_ASSERT(FALSE);
3139 DBUG_RETURN(-1);
3140 }
3141 }
3142 }
3143
3144 if (share == 0 || share->op == 0)
3145 {
3146 DBUG_RETURN(0);
3147 }
3148
3149 /*
3150 Syncronized drop between client thread and injector thread is
3151 neccessary in order to maintain ordering in the binlog,
3152 such that the drop occurs _after_ any inserts/updates/deletes.
3153
3154 The penalty for this is that the drop table becomes slow.
3155
3156 This wait is however not strictly neccessary to produce a binlog
3157 that is usable. However the slave does not currently handle
3158 these out of order, thus we are keeping the SYNC_DROP_ defined
3159 for now.
3160 */
3161 const char *save_proc_info= thd->proc_info;
3162 #define SYNC_DROP_
3163 #ifdef SYNC_DROP_
3164 thd->proc_info= "Syncing ndb table schema operation and binlog";
3165 mysql_mutex_lock(&share->mutex);
3166 int max_timeout= DEFAULT_SYNC_TIMEOUT;
3167 while (share->op)
3168 {
3169 struct timespec abstime;
3170 set_timespec(abstime, 1);
3171 int ret= mysql_cond_timedwait(&injector_cond,
3172 &share->mutex,
3173 &abstime);
3174 if (thd->killed ||
3175 share->op == 0)
3176 break;
3177 if (ret)
3178 {
3179 max_timeout--;
3180 if (max_timeout == 0)
3181 {
3182 sql_print_error("NDB %s: %s timed out. Ignoring...",
3183 type_str, share->key);
3184 break;
3185 }
3186 if (opt_ndb_extra_logging)
3187 ndb_report_waiting(type_str, max_timeout,
3188 type_str, share->key);
3189 }
3190 }
3191 mysql_mutex_unlock(&share->mutex);
3192 #else
3193 mysql_mutex_lock(&share->mutex);
3194 share->op_old= share->op;
3195 share->op= 0;
3196 mysql_mutex_unlock(&share->mutex);
3197 #endif
3198 thd->proc_info= save_proc_info;
3199
3200 DBUG_RETURN(0);
3201 }
3202
3203
3204 /********************************************************************
3205 Internal helper functions for differentd events from the stoarage nodes
3206 used by the ndb injector thread
3207 ********************************************************************/
3208
3209 /*
3210 Handle error states on events from the storage nodes
3211 */
ndb_binlog_thread_handle_error(Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row & row)3212 static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
3213 ndb_binlog_index_row &row)
3214 {
3215 NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
3216 DBUG_ENTER("ndb_binlog_thread_handle_error");
3217
3218 int overrun= pOp->isOverrun();
3219 if (overrun)
3220 {
3221 /*
3222 ToDo: this error should rather clear the ndb_binlog_index...
3223 and continue
3224 */
3225 sql_print_error("NDB Binlog: Overrun in event buffer, "
3226 "this means we have dropped events. Cannot "
3227 "continue binlog for %s", share->key);
3228 pOp->clearError();
3229 DBUG_RETURN(-1);
3230 }
3231
3232 if (!pOp->isConsistent())
3233 {
3234 /*
3235 ToDo: this error should rather clear the ndb_binlog_index...
3236 and continue
3237 */
3238 sql_print_error("NDB Binlog: Not Consistent. Cannot "
3239 "continue binlog for %s. Error code: %d"
3240 " Message: %s", share->key,
3241 pOp->getNdbError().code,
3242 pOp->getNdbError().message);
3243 pOp->clearError();
3244 DBUG_RETURN(-1);
3245 }
3246 sql_print_error("NDB Binlog: unhandled error %d for table %s",
3247 pOp->hasError(), share->key);
3248 pOp->clearError();
3249 DBUG_RETURN(0);
3250 }
3251
3252 static int
ndb_binlog_thread_handle_non_data_event(THD * thd,Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row & row)3253 ndb_binlog_thread_handle_non_data_event(THD *thd, Ndb *ndb,
3254 NdbEventOperation *pOp,
3255 ndb_binlog_index_row &row)
3256 {
3257 NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
3258 NDBEVENT::TableEvent type= pOp->getEventType();
3259
3260 switch (type)
3261 {
3262 case NDBEVENT::TE_CLUSTER_FAILURE:
3263 if (opt_ndb_extra_logging)
3264 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u.",
3265 share->key, (unsigned) pOp->getGCI());
3266 if (ndb_apply_status_share == share)
3267 {
3268 if (opt_ndb_extra_logging &&
3269 ndb_binlog_tables_inited && ndb_binlog_running)
3270 sql_print_information("NDB Binlog: ndb tables initially "
3271 "read only on reconnect.");
3272 /* ndb_share reference binlog extra free */
3273 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
3274 share->key, share->use_count));
3275 free_share(&ndb_apply_status_share);
3276 ndb_apply_status_share= 0;
3277 ndb_binlog_tables_inited= 0;
3278 }
3279 DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
3280 "%s received share: 0x%lx op: 0x%lx share op: 0x%lx "
3281 "op_old: 0x%lx",
3282 share->key, (long) share, (long) pOp,
3283 (long) share->op, (long) share->op_old));
3284 break;
3285 case NDBEVENT::TE_DROP:
3286 if (ndb_apply_status_share == share)
3287 {
3288 if (opt_ndb_extra_logging &&
3289 ndb_binlog_tables_inited && ndb_binlog_running)
3290 sql_print_information("NDB Binlog: ndb tables initially "
3291 "read only on reconnect.");
3292 /* ndb_share reference binlog extra free */
3293 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
3294 share->key, share->use_count));
3295 free_share(&ndb_apply_status_share);
3296 ndb_apply_status_share= 0;
3297 ndb_binlog_tables_inited= 0;
3298 }
3299 /* ToDo: remove printout */
3300 if (opt_ndb_extra_logging)
3301 sql_print_information("NDB Binlog: drop table %s.", share->key);
3302 // fall through
3303 case NDBEVENT::TE_ALTER:
3304 row.n_schemaops++;
3305 DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: 0x%lx "
3306 "share op: 0x%lx op_old: 0x%lx",
3307 type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
3308 share->key, (long) share, (long) pOp,
3309 (long) share->op, (long) share->op_old));
3310 break;
3311 case NDBEVENT::TE_NODE_FAILURE:
3312 /* fall through */
3313 case NDBEVENT::TE_SUBSCRIBE:
3314 /* fall through */
3315 case NDBEVENT::TE_UNSUBSCRIBE:
3316 /* ignore */
3317 return 0;
3318 default:
3319 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
3320 "Ignoring...", (unsigned) type, share->key);
3321 return 0;
3322 }
3323
3324 ndb_handle_schema_change(thd, ndb, pOp, share);
3325 return 0;
3326 }
3327
3328 /*
3329 Handle data events from the storage nodes
3330 */
3331 static int
ndb_binlog_thread_handle_data_event(Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row & row,injector::transaction & trans)3332 ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
3333 ndb_binlog_index_row &row,
3334 injector::transaction &trans)
3335 {
3336 NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
3337 if (share == ndb_apply_status_share)
3338 return 0;
3339
3340 uint32 originating_server_id= pOp->getAnyValue();
3341 if (originating_server_id == 0)
3342 originating_server_id= ::server_id;
3343 else if (originating_server_id & NDB_ANYVALUE_RESERVED)
3344 {
3345 if (originating_server_id != NDB_ANYVALUE_FOR_NOLOGGING)
3346 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
3347 "event not logged",
3348 originating_server_id);
3349 return 0;
3350 }
3351 else if (!g_ndb_log_slave_updates)
3352 {
3353 /*
3354 This event comes from a slave applier since it has an originating
3355 server id set. Since option to log slave updates is not set, skip it.
3356 */
3357 return 0;
3358 }
3359
3360 TABLE *table= share->table;
3361 DBUG_ASSERT(trans.good());
3362 DBUG_ASSERT(table != 0);
3363
3364 dbug_print_table("table", table);
3365
3366 TABLE_SHARE *table_s= table->s;
3367 uint n_fields= table_s->fields;
3368 MY_BITMAP b;
3369 /* Potential buffer for the bitmap */
3370 uint32 bitbuf[128 / (sizeof(uint32) * 8)];
3371 bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL,
3372 n_fields, FALSE);
3373 bitmap_set_all(&b);
3374
3375 /*
3376 row data is already in table->record[0]
3377 As we told the NdbEventOperation to do this
3378 (saves moving data about many times)
3379 */
3380
3381 /*
3382 for now malloc/free blobs buffer each time
3383 TODO if possible share single permanent buffer with handlers
3384 */
3385 uchar* blobs_buffer[2] = { 0, 0 };
3386 uint blobs_buffer_size[2] = { 0, 0 };
3387
3388 switch(pOp->getEventType())
3389 {
3390 case NDBEVENT::TE_INSERT:
3391 row.n_inserts++;
3392 DBUG_PRINT("info", ("INSERT INTO %s.%s",
3393 table_s->db.str, table_s->table_name.str));
3394 {
3395 if (share->flags & NSF_BLOB_FLAG)
3396 {
3397 my_ptrdiff_t ptrdiff= 0;
3398 int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[0],
3399 blobs_buffer[0],
3400 blobs_buffer_size[0],
3401 ptrdiff);
3402 DBUG_ASSERT(ret == 0);
3403 }
3404 ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
3405 int ret __attribute__((unused))= trans.write_row(originating_server_id,
3406 injector::transaction::table(table,
3407 TRUE),
3408 &b, n_fields, table->record[0]);
3409 DBUG_ASSERT(ret == 0);
3410 }
3411 break;
3412 case NDBEVENT::TE_DELETE:
3413 row.n_deletes++;
3414 DBUG_PRINT("info",("DELETE FROM %s.%s",
3415 table_s->db.str, table_s->table_name.str));
3416 {
3417 /*
3418 table->record[0] contains only the primary key in this case
3419 since we do not have an after image
3420 */
3421 int n;
3422 if (table->s->primary_key != MAX_KEY)
3423 n= 0; /*
3424 use the primary key only as it save time and space and
3425 it is the only thing needed to log the delete
3426 */
3427 else
3428 n= 1; /*
3429 we use the before values since we don't have a primary key
3430 since the mysql server does not handle the hidden primary
3431 key
3432 */
3433
3434 if (share->flags & NSF_BLOB_FLAG)
3435 {
3436 my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
3437 int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[n],
3438 blobs_buffer[n],
3439 blobs_buffer_size[n],
3440 ptrdiff);
3441 DBUG_ASSERT(ret == 0);
3442 }
3443 ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
3444 DBUG_EXECUTE("info", print_records(table, table->record[n]););
3445 int ret __attribute__((unused))= trans.delete_row(originating_server_id,
3446 injector::transaction::table(table,
3447 TRUE),
3448 &b, n_fields, table->record[n]);
3449 DBUG_ASSERT(ret == 0);
3450 }
3451 break;
3452 case NDBEVENT::TE_UPDATE:
3453 row.n_updates++;
3454 DBUG_PRINT("info", ("UPDATE %s.%s",
3455 table_s->db.str, table_s->table_name.str));
3456 {
3457 if (share->flags & NSF_BLOB_FLAG)
3458 {
3459 my_ptrdiff_t ptrdiff= 0;
3460 int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[0],
3461 blobs_buffer[0],
3462 blobs_buffer_size[0],
3463 ptrdiff);
3464 DBUG_ASSERT(ret == 0);
3465 }
3466 ndb_unpack_record(table, share->ndb_value[0],
3467 &b, table->record[0]);
3468 DBUG_EXECUTE("info", print_records(table, table->record[0]););
3469 if (table->s->primary_key != MAX_KEY)
3470 {
3471 /*
3472 since table has a primary key, we can do a write
3473 using only after values
3474 */
3475 trans.write_row(originating_server_id,
3476 injector::transaction::table(table, TRUE),
3477 &b, n_fields, table->record[0]);// after values
3478 }
3479 else
3480 {
3481 /*
3482 mysql server cannot handle the ndb hidden key and
3483 therefore needs the before image as well
3484 */
3485 if (share->flags & NSF_BLOB_FLAG)
3486 {
3487 my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
3488 int ret __attribute__((unused))= get_ndb_blobs_value(table, share->ndb_value[1],
3489 blobs_buffer[1],
3490 blobs_buffer_size[1],
3491 ptrdiff);
3492 DBUG_ASSERT(ret == 0);
3493 }
3494 ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
3495 DBUG_EXECUTE("info", print_records(table, table->record[1]););
3496 int ret __attribute__((unused))= trans.update_row(originating_server_id,
3497 injector::transaction::table(table,
3498 TRUE),
3499 &b, n_fields,
3500 table->record[1], // before values
3501 table->record[0]);// after values
3502 DBUG_ASSERT(ret == 0);
3503 }
3504 }
3505 break;
3506 default:
3507 /* We should REALLY never get here. */
3508 DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
3509 break;
3510 }
3511
3512 if (share->flags & NSF_BLOB_FLAG)
3513 {
3514 my_free(blobs_buffer[0]);
3515 my_free(blobs_buffer[1]);
3516 }
3517
3518 return 0;
3519 }
3520
3521 //#define RUN_NDB_BINLOG_TIMER
3522 #ifdef RUN_NDB_BINLOG_TIMER
3523 class Timer
3524 {
3525 public:
Timer()3526 Timer() { start(); }
start()3527 void start() { gettimeofday(&m_start, 0); }
stop()3528 void stop() { gettimeofday(&m_stop, 0); }
elapsed_ms()3529 ulong elapsed_ms()
3530 {
3531 return (ulong)
3532 (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
3533 ((longlong) m_stop.tv_usec -
3534 (longlong) m_start.tv_usec + 999) / 1000);
3535 }
3536 private:
3537 struct timeval m_start,m_stop;
3538 };
3539 #endif
3540
3541 /****************************************************************
3542 Injector thread main loop
3543 ****************************************************************/
3544
3545 static uchar *
ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT * schema_object,size_t * length,my_bool not_used)3546 ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
3547 size_t *length,
3548 my_bool not_used __attribute__((unused)))
3549 {
3550 *length= schema_object->key_length;
3551 return (uchar*) schema_object->key;
3552 }
3553
ndb_get_schema_object(const char * key,my_bool create_if_not_exists,my_bool have_lock)3554 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
3555 my_bool create_if_not_exists,
3556 my_bool have_lock)
3557 {
3558 NDB_SCHEMA_OBJECT *ndb_schema_object;
3559 uint length= (uint) strlen(key);
3560 DBUG_ENTER("ndb_get_schema_object");
3561 DBUG_PRINT("enter", ("key: '%s'", key));
3562
3563 if (!have_lock)
3564 mysql_mutex_lock(&ndbcluster_mutex);
3565 while (!(ndb_schema_object=
3566 (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
3567 (uchar*) key,
3568 length)))
3569 {
3570 if (!create_if_not_exists)
3571 {
3572 DBUG_PRINT("info", ("does not exist"));
3573 break;
3574 }
3575 if (!(ndb_schema_object=
3576 (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1,
3577 MYF(MY_WME | MY_ZEROFILL))))
3578 {
3579 DBUG_PRINT("info", ("malloc error"));
3580 break;
3581 }
3582 ndb_schema_object->key= (char *)(ndb_schema_object+1);
3583 memcpy(ndb_schema_object->key, key, length + 1);
3584 ndb_schema_object->key_length= length;
3585 if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
3586 {
3587 my_free(ndb_schema_object);
3588 break;
3589 }
3590 mysql_mutex_init(key_ndb_schema_object_mutex, &ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
3591 bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
3592 sizeof(ndb_schema_object->slock)*8, FALSE);
3593 bitmap_clear_all(&ndb_schema_object->slock_bitmap);
3594 break;
3595 }
3596 if (ndb_schema_object)
3597 {
3598 ndb_schema_object->use_count++;
3599 DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
3600 }
3601 if (!have_lock)
3602 mysql_mutex_unlock(&ndbcluster_mutex);
3603 DBUG_RETURN(ndb_schema_object);
3604 }
3605
3606
ndb_free_schema_object(NDB_SCHEMA_OBJECT ** ndb_schema_object,bool have_lock)3607 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
3608 bool have_lock)
3609 {
3610 DBUG_ENTER("ndb_free_schema_object");
3611 DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
3612 if (!have_lock)
3613 mysql_mutex_lock(&ndbcluster_mutex);
3614 if (!--(*ndb_schema_object)->use_count)
3615 {
3616 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
3617 my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
3618 mysql_mutex_destroy(&(*ndb_schema_object)->mutex);
3619 my_free(*ndb_schema_object);
3620 *ndb_schema_object= 0;
3621 }
3622 else
3623 {
3624 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
3625 }
3626 if (!have_lock)
3627 mysql_mutex_unlock(&ndbcluster_mutex);
3628 DBUG_VOID_RETURN;
3629 }
3630
3631 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
3632 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
3633
ndb_binlog_thread_func(void * arg)3634 pthread_handler_t ndb_binlog_thread_func(void *arg)
3635 {
3636 THD *thd; /* needs to be first for thread_stack */
3637 Ndb *i_ndb= 0;
3638 Ndb *s_ndb= 0;
3639 Thd_ndb *thd_ndb=0;
3640 int ndb_update_ndb_binlog_index= 1;
3641 injector *inj= injector::instance();
3642 uint incident_id= 0;
3643
3644 #ifdef RUN_NDB_BINLOG_TIMER
3645 Timer main_timer;
3646 #endif
3647
3648 mysql_mutex_lock(&injector_mutex);
3649 /*
3650 Set up the Thread
3651 */
3652 my_thread_init();
3653 DBUG_ENTER("ndb_binlog_thread");
3654
3655 thd= new THD; /* note that contructor of THD uses DBUG_ */
3656 THD_CHECK_SENTRY(thd);
3657 thd->set_current_stmt_binlog_format_row();
3658
3659 /* We need to set thd->thread_id before thd->store_globals, or it will
3660 set an invalid value for thd->variables.pseudo_thread_id.
3661 */
3662 mysql_mutex_lock(&LOCK_thread_count);
3663 thd->thread_id= thread_id++;
3664 mysql_mutex_unlock(&LOCK_thread_count);
3665
3666 mysql_thread_set_psi_id(thd->thread_id);
3667
3668 thd->thread_stack= (char*) &thd; /* remember where our stack is */
3669 if (thd->store_globals())
3670 {
3671 thd->cleanup();
3672 delete thd;
3673 ndb_binlog_thread_running= -1;
3674 mysql_mutex_unlock(&injector_mutex);
3675 mysql_cond_signal(&injector_cond);
3676
3677 DBUG_LEAVE; // Must match DBUG_ENTER()
3678 my_thread_end();
3679 pthread_exit(0);
3680 return NULL; // Avoid compiler warnings
3681 }
3682
3683 thd->init_for_queries();
3684 thd->command= COM_DAEMON;
3685 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
3686 thd->main_security_ctx.host_or_ip= "";
3687 thd->client_capabilities= 0;
3688 my_net_init(&thd->net, 0);
3689 thd->main_security_ctx.master_access= ~0;
3690 thd->main_security_ctx.priv_user[0]= 0;
3691 /* Do not use user-supplied timeout value for system threads. */
3692 thd->variables.lock_wait_timeout= LONG_TIMEOUT;
3693
3694 /*
3695 Set up ndb binlog
3696 */
3697 sql_print_information("Starting MySQL Cluster Binlog Thread");
3698
3699 pthread_detach_this_thread();
3700 thd->real_id= pthread_self();
3701 mysql_mutex_lock(&LOCK_thread_count);
3702 threads.append(thd);
3703 mysql_mutex_unlock(&LOCK_thread_count);
3704 thd->lex->start_transaction_opt= 0;
3705
3706 if (!(s_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
3707 s_ndb->init())
3708 {
3709 sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
3710 ndb_binlog_thread_running= -1;
3711 mysql_mutex_unlock(&injector_mutex);
3712 mysql_cond_signal(&injector_cond);
3713 goto err;
3714 }
3715
3716 // empty database
3717 if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
3718 i_ndb->init())
3719 {
3720 sql_print_error("NDB Binlog: Getting Ndb object failed");
3721 ndb_binlog_thread_running= -1;
3722 mysql_mutex_unlock(&injector_mutex);
3723 mysql_cond_signal(&injector_cond);
3724 goto err;
3725 }
3726
3727 /* init hash for schema object distribution */
3728 (void) my_hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
3729 (my_hash_get_key)ndb_schema_objects_get_key, 0, 0);
3730
3731 /*
3732 Expose global reference to our ndb object.
3733
3734 Used by both sql client thread and binlog thread to interact
3735 with the storage
3736 mysql_mutex_lock(&injector_mutex);
3737 */
3738 injector_thd= thd;
3739 injector_ndb= i_ndb;
3740 p_latest_trans_gci=
3741 injector_ndb->get_ndb_cluster_connection().get_latest_trans_gci();
3742 schema_ndb= s_ndb;
3743
3744 if (opt_bin_log)
3745 {
3746 ndb_binlog_running= TRUE;
3747 }
3748
3749 /* Thread start up completed */
3750 ndb_binlog_thread_running= 1;
3751 mysql_mutex_unlock(&injector_mutex);
3752 mysql_cond_signal(&injector_cond);
3753
3754 /*
3755 wait for mysql server to start (so that the binlog is started
3756 and thus can receive the first GAP event)
3757 */
3758 mysql_mutex_lock(&LOCK_server_started);
3759 while (!mysqld_server_started)
3760 {
3761 struct timespec abstime;
3762 set_timespec(abstime, 1);
3763 mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
3764 &abstime);
3765 if (ndbcluster_terminating)
3766 {
3767 mysql_mutex_unlock(&LOCK_server_started);
3768 goto err;
3769 }
3770 }
3771 mysql_mutex_unlock(&LOCK_server_started);
3772 restart:
3773 /*
3774 Main NDB Injector loop
3775 */
3776 while (ndb_binlog_running)
3777 {
3778 /*
3779 check if it is the first log, if so we do not insert a GAP event
3780 as there is really no log to have a GAP in
3781 */
3782 if (incident_id == 0)
3783 {
3784 LOG_INFO log_info;
3785 mysql_bin_log.get_current_log(&log_info);
3786 int len= strlen(log_info.log_file_name);
3787 uint no= 0;
3788 if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
3789 no == 1)
3790 {
3791 /* this is the fist log, so skip GAP event */
3792 break;
3793 }
3794 }
3795
3796 /*
3797 Always insert a GAP event as we cannot know what has happened
3798 in the cluster while not being connected.
3799 */
3800 LEX_STRING const msg[2]=
3801 {
3802 { C_STRING_WITH_LEN("mysqld startup") },
3803 { C_STRING_WITH_LEN("cluster disconnect")}
3804 };
3805 int error __attribute__((unused))=
3806 inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg[incident_id]);
3807 DBUG_ASSERT(!error);
3808 break;
3809 }
3810 incident_id= 1;
3811 {
3812 thd->proc_info= "Waiting for ndbcluster to start";
3813
3814 mysql_mutex_lock(&injector_mutex);
3815 while (!ndb_schema_share ||
3816 (ndb_binlog_running && !ndb_apply_status_share))
3817 {
3818 /* ndb not connected yet */
3819 struct timespec abstime;
3820 set_timespec(abstime, 1);
3821 mysql_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
3822 if (ndbcluster_binlog_terminating)
3823 {
3824 mysql_mutex_unlock(&injector_mutex);
3825 goto err;
3826 }
3827 }
3828 mysql_mutex_unlock(&injector_mutex);
3829
3830 if (thd_ndb == NULL)
3831 {
3832 DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
3833 if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
3834 {
3835 sql_print_error("Could not allocate Thd_ndb object");
3836 goto err;
3837 }
3838 set_thd_ndb(thd, thd_ndb);
3839 thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
3840 thd->query_id= 0; // to keep valgrind quiet
3841 }
3842 }
3843
3844 {
3845 // wait for the first event
3846 thd->proc_info= "Waiting for first event from ndbcluster";
3847 int schema_res, res;
3848 Uint64 schema_gci;
3849 do
3850 {
3851 DBUG_PRINT("info", ("Waiting for the first event"));
3852
3853 if (ndbcluster_binlog_terminating)
3854 goto err;
3855
3856 schema_res= s_ndb->pollEvents(100, &schema_gci);
3857 } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
3858 if (ndb_binlog_running)
3859 {
3860 Uint64 gci= i_ndb->getLatestGCI();
3861 while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
3862 {
3863 if (ndbcluster_binlog_terminating)
3864 goto err;
3865 res= i_ndb->pollEvents(10, &gci);
3866 }
3867 if (gci > schema_gci)
3868 {
3869 schema_gci= gci;
3870 }
3871 }
3872 // now check that we have epochs consistant with what we had before the restart
3873 DBUG_PRINT("info", ("schema_res: %d schema_gci: %lu", schema_res,
3874 (long) schema_gci));
3875 {
3876 i_ndb->flushIncompleteEvents(schema_gci);
3877 s_ndb->flushIncompleteEvents(schema_gci);
3878 if (schema_gci < ndb_latest_handled_binlog_epoch)
3879 {
3880 sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
3881 "ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
3882 "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
3883 (unsigned) ndb_latest_handled_binlog_epoch, (unsigned) schema_gci);
3884 *p_latest_trans_gci= 0;
3885 ndb_latest_handled_binlog_epoch= 0;
3886 ndb_latest_applied_binlog_epoch= 0;
3887 ndb_latest_received_binlog_epoch= 0;
3888 }
3889 else if (ndb_latest_applied_binlog_epoch > 0)
3890 {
3891 sql_print_warning("NDB Binlog: cluster has reconnected. "
3892 "Changes to the database that occured while "
3893 "disconnected will not be in the binlog");
3894 }
3895 if (opt_ndb_extra_logging)
3896 {
3897 sql_print_information("NDB Binlog: starting log at epoch %u",
3898 (unsigned)schema_gci);
3899 }
3900 }
3901 }
3902 {
3903 static char db[]= "";
3904 thd->db= db;
3905 }
3906 do_ndbcluster_binlog_close_connection= BCCC_running;
3907 for ( ; !((ndbcluster_binlog_terminating ||
3908 do_ndbcluster_binlog_close_connection) &&
3909 ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci) &&
3910 do_ndbcluster_binlog_close_connection != BCCC_restart; )
3911 {
3912 #ifndef DBUG_OFF
3913 if (do_ndbcluster_binlog_close_connection)
3914 {
3915 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
3916 "ndb_latest_handled_binlog_epoch: %lu, "
3917 "*p_latest_trans_gci: %lu",
3918 do_ndbcluster_binlog_close_connection,
3919 (ulong) ndb_latest_handled_binlog_epoch,
3920 (ulong) *p_latest_trans_gci));
3921 }
3922 #endif
3923 #ifdef RUN_NDB_BINLOG_TIMER
3924 main_timer.stop();
3925 sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
3926 main_timer.start();
3927 #endif
3928
3929 /*
3930 now we don't want any events before next gci is complete
3931 */
3932 thd->proc_info= "Waiting for event from ndbcluster";
3933 thd->set_time();
3934
3935 /* wait for event or 1000 ms */
3936 Uint64 gci= 0, schema_gci;
3937 int res= 0, tot_poll_wait= 1000;
3938 if (ndb_binlog_running)
3939 {
3940 res= i_ndb->pollEvents(tot_poll_wait, &gci);
3941 tot_poll_wait= 0;
3942 }
3943 else
3944 {
3945 /*
3946 Just consume any events, not used if no binlogging
3947 e.g. node failure events
3948 */
3949 Uint64 tmp_gci;
3950 if (i_ndb->pollEvents(0, &tmp_gci))
3951 while (i_ndb->nextEvent())
3952 ;
3953 }
3954 int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
3955 ndb_latest_received_binlog_epoch= gci;
3956
3957 while (gci > schema_gci && schema_res >= 0)
3958 {
3959 static char buf[64];
3960 thd->proc_info= "Waiting for schema epoch";
3961 my_snprintf(buf, sizeof(buf), "%s %u(%u)", thd->proc_info, (unsigned) schema_gci, (unsigned) gci);
3962 thd->proc_info= buf;
3963 schema_res= s_ndb->pollEvents(10, &schema_gci);
3964 }
3965
3966 if ((ndbcluster_binlog_terminating ||
3967 do_ndbcluster_binlog_close_connection) &&
3968 (ndb_latest_handled_binlog_epoch >= *p_latest_trans_gci ||
3969 !ndb_binlog_running))
3970 break; /* Shutting down server */
3971
3972 if (ndb_binlog_index && ndb_binlog_index->s->has_old_version())
3973 {
3974 if (ndb_binlog_index->s->has_old_version())
3975 {
3976 trans_commit_stmt(thd);
3977 close_thread_tables(thd);
3978 thd->mdl_context.release_transactional_locks();
3979 ndb_binlog_index= 0;
3980 }
3981 }
3982
3983 MEM_ROOT **root_ptr=
3984 my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
3985 MEM_ROOT *old_root= *root_ptr;
3986 MEM_ROOT mem_root;
3987 init_sql_alloc(&mem_root, 4096, 0);
3988 List<Cluster_schema> post_epoch_log_list;
3989 List<Cluster_schema> post_epoch_unlock_list;
3990 *root_ptr= &mem_root;
3991
3992 if (unlikely(schema_res > 0))
3993 {
3994 thd->proc_info= "Processing events from schema table";
3995 s_ndb->
3996 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
3997 s_ndb->
3998 setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
3999 NdbEventOperation *pOp= s_ndb->nextEvent();
4000 while (pOp != NULL)
4001 {
4002 if (!pOp->hasError())
4003 {
4004 ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
4005 &post_epoch_log_list,
4006 &post_epoch_unlock_list,
4007 &mem_root);
4008 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
4009 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4010 "<empty>"));
4011 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
4012 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4013 "<empty>"));
4014 if (i_ndb->getEventOperation() == NULL &&
4015 s_ndb->getEventOperation() == NULL &&
4016 do_ndbcluster_binlog_close_connection == BCCC_running)
4017 {
4018 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4019 do_ndbcluster_binlog_close_connection= BCCC_restart;
4020 if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4021 {
4022 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4023 "as latest received epoch is %lu",
4024 (ulong) *p_latest_trans_gci,
4025 (ulong) ndb_latest_received_binlog_epoch);
4026 }
4027 }
4028 }
4029 else
4030 sql_print_error("NDB: error %lu (%s) on handling "
4031 "binlog schema event",
4032 (ulong) pOp->getNdbError().code,
4033 pOp->getNdbError().message);
4034 pOp= s_ndb->nextEvent();
4035 }
4036 }
4037
4038 if (res > 0)
4039 {
4040 DBUG_PRINT("info", ("pollEvents res: %d", res));
4041 thd->proc_info= "Processing events";
4042 NdbEventOperation *pOp= i_ndb->nextEvent();
4043 ndb_binlog_index_row row;
4044 while (pOp != NULL)
4045 {
4046 #ifdef RUN_NDB_BINLOG_TIMER
4047 Timer gci_timer, write_timer;
4048 int event_count= 0;
4049 gci_timer.start();
4050 #endif
4051 gci= pOp->getGCI();
4052 DBUG_PRINT("info", ("Handling gci: %d", (unsigned)gci));
4053 // sometimes get TE_ALTER with invalid table
4054 DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
4055 ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
4056 DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
4057
4058 /* initialize some variables for this epoch */
4059 g_ndb_log_slave_updates= opt_log_slave_updates;
4060 i_ndb->
4061 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
4062 i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
4063
4064 bzero((char*) &row, sizeof(row));
4065 thd->variables.character_set_client= &my_charset_latin1;
4066 injector::transaction trans;
4067 // pass table map before epoch
4068 {
4069 Uint32 iter= 0;
4070 const NdbEventOperation *gci_op;
4071 Uint32 event_types;
4072 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4073 != NULL)
4074 {
4075 NDB_SHARE *share= (NDB_SHARE*)gci_op->getCustomData();
4076 DBUG_PRINT("info", ("per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
4077 (long) gci_op, (long) share, event_types));
4078 // workaround for interface returning TE_STOP events
4079 // which are normally filtered out below in the nextEvent loop
4080 if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
4081 {
4082 DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
4083 gci_op->getEvent()->getTable()->getName()));
4084 continue;
4085 }
4086 // this should not happen
4087 if (share == NULL || share->table == NULL)
4088 {
4089 DBUG_PRINT("info", ("no share or table %s!",
4090 gci_op->getEvent()->getTable()->getName()));
4091 continue;
4092 }
4093 if (share == ndb_apply_status_share)
4094 {
4095 // skip this table, it is handled specially
4096 continue;
4097 }
4098 TABLE *table= share->table;
4099 #ifndef DBUG_OFF
4100 const LEX_STRING &name= table->s->table_name;
4101 #endif
4102 if ((event_types & (NdbDictionary::Event::TE_INSERT |
4103 NdbDictionary::Event::TE_UPDATE |
4104 NdbDictionary::Event::TE_DELETE)) == 0)
4105 {
4106 DBUG_PRINT("info", ("skipping non data event table: %.*s",
4107 (int) name.length, name.str));
4108 continue;
4109 }
4110 if (!trans.good())
4111 {
4112 DBUG_PRINT("info",
4113 ("Found new data event, initializing transaction"));
4114 inj->new_trans(thd, &trans);
4115 }
4116 DBUG_PRINT("info", ("use_table: %.*s",
4117 (int) name.length, name.str));
4118 injector::transaction::table tbl(table, TRUE);
4119 int ret __attribute__((unused))= trans.use_table(::server_id, tbl);
4120 DBUG_ASSERT(ret == 0);
4121 }
4122 }
4123 if (trans.good())
4124 {
4125 if (ndb_apply_status_share)
4126 {
4127 TABLE *table= ndb_apply_status_share->table;
4128
4129 #ifndef DBUG_OFF
4130 const LEX_STRING& name= table->s->table_name;
4131 DBUG_PRINT("info", ("use_table: %.*s",
4132 (int) name.length, name.str));
4133 #endif
4134 injector::transaction::table tbl(table, TRUE);
4135 int ret __attribute__((unused))= trans.use_table(::server_id, tbl);
4136 DBUG_ASSERT(ret == 0);
4137
4138 /*
4139 Intialize table->record[0]
4140 */
4141 empty_record(table);
4142
4143 table->field[0]->store((longlong)::server_id);
4144 table->field[1]->store((longlong)gci);
4145 table->field[2]->store("", 0, &my_charset_bin);
4146 table->field[3]->store((longlong)0);
4147 table->field[4]->store((longlong)0);
4148 trans.write_row(::server_id,
4149 injector::transaction::table(table, TRUE),
4150 &table->s->all_set, table->s->fields,
4151 table->record[0]);
4152 }
4153 else
4154 {
4155 sql_print_error("NDB: Could not get apply status share");
4156 }
4157 }
4158 #ifdef RUN_NDB_BINLOG_TIMER
4159 write_timer.start();
4160 #endif
4161 do
4162 {
4163 #ifdef RUN_NDB_BINLOG_TIMER
4164 event_count++;
4165 #endif
4166 if (pOp->hasError() &&
4167 ndb_binlog_thread_handle_error(i_ndb, pOp, row) < 0)
4168 goto err;
4169
4170 #ifndef DBUG_OFF
4171 {
4172 NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
4173 DBUG_PRINT("info",
4174 ("EVENT TYPE: %d GCI: %ld last applied: %ld "
4175 "share: 0x%lx (%s.%s)", pOp->getEventType(),
4176 (long) gci,
4177 (long) ndb_latest_applied_binlog_epoch,
4178 (long) share,
4179 share ? share->db : "'NULL'",
4180 share ? share->table_name : "'NULL'"));
4181 DBUG_ASSERT(share != 0);
4182 }
4183 // assert that there is consistancy between gci op list
4184 // and event list
4185 {
4186 Uint32 iter= 0;
4187 const NdbEventOperation *gci_op;
4188 Uint32 event_types;
4189 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
4190 != NULL)
4191 {
4192 if (gci_op == pOp)
4193 break;
4194 }
4195 DBUG_ASSERT(gci_op == pOp);
4196 DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
4197 }
4198 #endif
4199 if ((unsigned) pOp->getEventType() <
4200 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
4201 ndb_binlog_thread_handle_data_event(i_ndb, pOp, row, trans);
4202 else
4203 {
4204 // set injector_ndb database/schema from table internal name
4205 int ret __attribute__((unused))=
4206 i_ndb->setDatabaseAndSchemaName(pOp->getEvent()->getTable());
4207 DBUG_ASSERT(ret == 0);
4208 ndb_binlog_thread_handle_non_data_event(thd, i_ndb, pOp, row);
4209 // reset to catch errors
4210 i_ndb->setDatabaseName("");
4211 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
4212 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4213 "<empty>"));
4214 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
4215 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
4216 "<empty>"));
4217 if (i_ndb->getEventOperation() == NULL &&
4218 s_ndb->getEventOperation() == NULL &&
4219 do_ndbcluster_binlog_close_connection == BCCC_running)
4220 {
4221 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
4222 do_ndbcluster_binlog_close_connection= BCCC_restart;
4223 if (ndb_latest_received_binlog_epoch < *p_latest_trans_gci && ndb_binlog_running)
4224 {
4225 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
4226 "as latest received epoch is %lu",
4227 (ulong) *p_latest_trans_gci,
4228 (ulong) ndb_latest_received_binlog_epoch);
4229 }
4230 }
4231 }
4232
4233 pOp= i_ndb->nextEvent();
4234 } while (pOp && pOp->getGCI() == gci);
4235
4236 /*
4237 note! pOp is not referring to an event in the next epoch
4238 or is == 0
4239 */
4240 #ifdef RUN_NDB_BINLOG_TIMER
4241 write_timer.stop();
4242 #endif
4243
4244 if (trans.good())
4245 {
4246 //DBUG_ASSERT(row.n_inserts || row.n_updates || row.n_deletes);
4247 thd->proc_info= "Committing events to binlog";
4248 injector::transaction::binlog_pos start= trans.start_pos();
4249 if (int r= trans.commit())
4250 {
4251 sql_print_error("NDB Binlog: "
4252 "Error during COMMIT of GCI. Error: %d",
4253 r);
4254 /* TODO: Further handling? */
4255 }
4256 row.gci= gci;
4257 row.master_log_file= start.file_name();
4258 row.master_log_pos= start.file_pos();
4259
4260 DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
4261 if (ndb_update_ndb_binlog_index)
4262 ndb_add_ndb_binlog_index(thd, &row);
4263 ndb_latest_applied_binlog_epoch= gci;
4264 }
4265 ndb_latest_handled_binlog_epoch= gci;
4266 #ifdef RUN_NDB_BINLOG_TIMER
4267 gci_timer.stop();
4268 sql_print_information("gci %ld event_count %d write time "
4269 "%ld(%d e/s), total time %ld(%d e/s)",
4270 (ulong)gci, event_count,
4271 write_timer.elapsed_ms(),
4272 (1000*event_count) / write_timer.elapsed_ms(),
4273 gci_timer.elapsed_ms(),
4274 (1000*event_count) / gci_timer.elapsed_ms());
4275 #endif
4276 }
4277 }
4278
4279 ndb_binlog_thread_handle_schema_event_post_epoch(thd,
4280 &post_epoch_log_list,
4281 &post_epoch_unlock_list);
4282 free_root(&mem_root, MYF(0));
4283 *root_ptr= old_root;
4284 ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
4285 }
4286 if (do_ndbcluster_binlog_close_connection == BCCC_restart)
4287 {
4288 ndb_binlog_tables_inited= FALSE;
4289 trans_commit_stmt(thd);
4290 close_thread_tables(thd);
4291 thd->mdl_context.release_transactional_locks();
4292 ndb_binlog_index= 0;
4293 goto restart;
4294 }
4295 err:
4296 sql_print_information("Stopping Cluster Binlog");
4297 DBUG_PRINT("info",("Shutting down cluster binlog thread"));
4298 thd->proc_info= "Shutting down";
4299 thd->stmt_da->can_overwrite_status= TRUE;
4300 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
4301 thd->stmt_da->can_overwrite_status= FALSE;
4302 close_thread_tables(thd);
4303 thd->mdl_context.release_transactional_locks();
4304 mysql_mutex_lock(&injector_mutex);
4305 /* don't mess with the injector_ndb anymore from other threads */
4306 injector_thd= 0;
4307 injector_ndb= 0;
4308 p_latest_trans_gci= 0;
4309 schema_ndb= 0;
4310 mysql_mutex_unlock(&injector_mutex);
4311 thd->db= 0; // as not to try to free memory
4312
4313 if (ndb_apply_status_share)
4314 {
4315 /* ndb_share reference binlog extra free */
4316 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
4317 ndb_apply_status_share->key,
4318 ndb_apply_status_share->use_count));
4319 free_share(&ndb_apply_status_share);
4320 ndb_apply_status_share= 0;
4321 }
4322 if (ndb_schema_share)
4323 {
4324 /* begin protect ndb_schema_share */
4325 mysql_mutex_lock(&ndb_schema_share_mutex);
4326 /* ndb_share reference binlog extra free */
4327 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
4328 ndb_schema_share->key,
4329 ndb_schema_share->use_count));
4330 free_share(&ndb_schema_share);
4331 ndb_schema_share= 0;
4332 ndb_binlog_tables_inited= 0;
4333 mysql_mutex_unlock(&ndb_schema_share_mutex);
4334 /* end protect ndb_schema_share */
4335 }
4336
4337 /* remove all event operations */
4338 if (s_ndb)
4339 {
4340 NdbEventOperation *op;
4341 DBUG_PRINT("info",("removing all event operations"));
4342 while ((op= s_ndb->getEventOperation()))
4343 {
4344 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
4345 DBUG_PRINT("info",("removing event operation on %s",
4346 op->getEvent()->getName()));
4347 NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4348 DBUG_ASSERT(share != 0);
4349 DBUG_ASSERT(share->op == op ||
4350 share->op_old == op);
4351 share->op= share->op_old= 0;
4352 /* ndb_share reference binlog free */
4353 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
4354 share->key, share->use_count));
4355 free_share(&share);
4356 s_ndb->dropEventOperation(op);
4357 }
4358 delete s_ndb;
4359 s_ndb= 0;
4360 }
4361 if (i_ndb)
4362 {
4363 NdbEventOperation *op;
4364 DBUG_PRINT("info",("removing all event operations"));
4365 while ((op= i_ndb->getEventOperation()))
4366 {
4367 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
4368 DBUG_PRINT("info",("removing event operation on %s",
4369 op->getEvent()->getName()));
4370 NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
4371 DBUG_ASSERT(share != 0);
4372 DBUG_ASSERT(share->op == op ||
4373 share->op_old == op);
4374 share->op= share->op_old= 0;
4375 /* ndb_share reference binlog free */
4376 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
4377 share->key, share->use_count));
4378 free_share(&share);
4379 i_ndb->dropEventOperation(op);
4380 }
4381 delete i_ndb;
4382 i_ndb= 0;
4383 }
4384
4385 my_hash_free(&ndb_schema_objects);
4386
4387 net_end(&thd->net);
4388 thd->cleanup();
4389 delete thd;
4390
4391 ndb_binlog_thread_running= -1;
4392 ndb_binlog_running= FALSE;
4393 mysql_cond_signal(&injector_cond);
4394
4395 DBUG_PRINT("exit", ("ndb_binlog_thread"));
4396
4397 DBUG_LEAVE; // Must match DBUG_ENTER()
4398 my_thread_end();
4399 pthread_exit(0);
4400 return NULL; // Avoid compiler warnings
4401 }
4402
4403 bool
ndbcluster_show_status_binlog(THD * thd,stat_print_fn * stat_print,enum ha_stat_type stat_type)4404 ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
4405 enum ha_stat_type stat_type)
4406 {
4407 char buf[IO_SIZE];
4408 uint buflen;
4409 ulonglong ndb_latest_epoch= 0;
4410 DBUG_ENTER("ndbcluster_show_status_binlog");
4411
4412 mysql_mutex_lock(&injector_mutex);
4413 if (injector_ndb)
4414 {
4415 char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
4416 ndb_latest_epoch= injector_ndb->getLatestGCI();
4417 mysql_mutex_unlock(&injector_mutex);
4418
4419 buflen=
4420 snprintf(buf, sizeof(buf),
4421 "latest_epoch=%s, "
4422 "latest_trans_epoch=%s, "
4423 "latest_received_binlog_epoch=%s, "
4424 "latest_handled_binlog_epoch=%s, "
4425 "latest_applied_binlog_epoch=%s",
4426 llstr(ndb_latest_epoch, buff1),
4427 llstr(*p_latest_trans_gci, buff2),
4428 llstr(ndb_latest_received_binlog_epoch, buff3),
4429 llstr(ndb_latest_handled_binlog_epoch, buff4),
4430 llstr(ndb_latest_applied_binlog_epoch, buff5));
4431 if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
4432 "binlog", strlen("binlog"),
4433 buf, buflen))
4434 DBUG_RETURN(TRUE);
4435 }
4436 else
4437 mysql_mutex_unlock(&injector_mutex);
4438 DBUG_RETURN(FALSE);
4439 }
4440
4441 #endif /* HAVE_NDB_BINLOG */
4442 #endif
4443