1 /*
2 Copyright (c) 2000, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "ha_ndbcluster_glue.h"
26 #include "ha_ndbcluster.h"
27 #include "ha_ndbcluster_connection.h"
28 #include "ndb_local_connection.h"
29 #include "ndb_thd.h"
30 #include "ndb_table_guard.h"
31 #include "ndb_global_schema_lock.h"
32 #include "ndb_global_schema_lock_guard.h"
33 #include "ndb_tdc.h"
34 #include "ndb_name_util.h"
35
36 #include "rpl_injector.h"
37 #include "rpl_filter.h"
38 #if MYSQL_VERSION_ID > 50600
39 #include "rpl_slave.h"
40 #else
41 #include "slave.h"
42 #include "log_event.h"
43 #endif
44 #include "binlog.h"
45 #include "ha_ndbcluster_binlog.h"
46 #include <ndbapi/NdbDictionary.hpp>
47 #include <ndbapi/ndb_cluster_connection.hpp>
48 #include "mysqld_thd_manager.h" // Global_THD_manager
49
50 extern my_bool opt_ndb_log_orig;
51 extern my_bool opt_ndb_log_bin;
52 extern my_bool opt_ndb_log_update_as_write;
53 extern my_bool opt_ndb_log_updated_only;
54 extern my_bool opt_ndb_log_update_minimal;
55 extern my_bool opt_ndb_log_binlog_index;
56 extern my_bool opt_ndb_log_apply_status;
57 extern ulong opt_ndb_extra_logging;
58 extern st_ndb_slave_state g_ndb_slave_state;
59 extern my_bool opt_ndb_log_transaction_id;
60 extern my_bool log_bin_use_v1_row_events;
61 extern my_bool opt_ndb_log_empty_update;
62
63 bool ndb_log_empty_epochs(void);
64
65 void ndb_index_stat_restart();
66
67 /*
68 defines for cluster replication table names
69 */
70 #include "ha_ndbcluster_tables.h"
71
72 #include "ndb_dist_priv_util.h"
73 #include "ndb_anyvalue.h"
74 #include "ndb_binlog_extra_row_info.h"
75 #include "ndb_event_data.h"
76 #include "ndb_schema_object.h"
77 #include "ndb_schema_dist.h"
78 #include "ndb_repl_tab.h"
79 #include "ndb_binlog_thread.h"
80 #include "ndb_find_files_list.h"
81
82 /*
83 Timeout for syncing schema events between
84 mysql servers, and between mysql server and the binlog
85 */
86 static const int DEFAULT_SYNC_TIMEOUT= 120;
87
88 /* Column numbers in the ndb_binlog_index table */
89 enum Ndb_binlog_index_cols
90 {
91 NBICOL_START_POS = 0
92 ,NBICOL_START_FILE = 1
93 ,NBICOL_EPOCH = 2
94 ,NBICOL_NUM_INSERTS = 3
95 ,NBICOL_NUM_UPDATES = 4
96 ,NBICOL_NUM_DELETES = 5
97 ,NBICOL_NUM_SCHEMAOPS = 6
98 /* Following colums in schema 'v2' */
99 ,NBICOL_ORIG_SERVERID = 7
100 ,NBICOL_ORIG_EPOCH = 8
101 ,NBICOL_GCI = 9
102 /* Following columns in schema 'v3' */
103 ,NBICOL_NEXT_POS = 10
104 ,NBICOL_NEXT_FILE = 11
105 };
106
107 /*
108 Flag showing if the ndb binlog should be created, if so == TRUE
109 FALSE if not
110 */
111 my_bool ndb_binlog_running= FALSE;
112 static my_bool ndb_binlog_tables_inited= FALSE;
113 static my_bool ndb_binlog_is_ready= FALSE;
114
115 bool
ndb_binlog_is_read_only(void)116 ndb_binlog_is_read_only(void)
117 {
118 if(!ndb_binlog_tables_inited)
119 {
120 /* the ndb_* system tables not setup yet */
121 return true;
122 }
123
124 if (ndb_binlog_running && !ndb_binlog_is_ready)
125 {
126 /*
127 The binlog thread is supposed to write to binlog
128 but not ready (still initializing or has lost connection)
129 */
130 return true;
131 }
132 return false;
133 }
134
135 /*
136 Global reference to the ndb injector thread THD oject
137
138 Has one sole purpose, for setting the in_use table member variable
139 in get_share(...)
140 */
141 extern THD * injector_thd; // Declared in ha_ndbcluster.cc
142
143 /*
144 Global reference to ndb injector thd object.
145
146 Used mainly by the binlog index thread, but exposed to the client sql
147 thread for one reason; to setup the events operations for a table
148 to enable ndb injector thread receiving events.
149
150 Must therefore always be used with a surrounding
151 native_mutex_lock(&injector_mutex), when doing create/dropEventOperation
152 */
153 static Ndb *injector_ndb= 0;
154 static Ndb *schema_ndb= 0;
155
156 static int ndbcluster_binlog_inited= 0;
157
158 /*
159 Mutex and condition used for interacting between client sql thread
160 and injector thread
161 */
162 static native_mutex_t injector_mutex;
163 static native_cond_t injector_cond;
164
165 /* NDB Injector thread (used for binlog creation) */
166 static ulonglong ndb_latest_applied_binlog_epoch= 0;
167 static ulonglong ndb_latest_handled_binlog_epoch= 0;
168 static ulonglong ndb_latest_received_binlog_epoch= 0;
169
170 NDB_SHARE *ndb_apply_status_share= 0;
171 NDB_SHARE *ndb_schema_share= 0;
172 static native_mutex_t ndb_schema_share_mutex;
173
174 extern my_bool opt_log_slave_updates;
175 static my_bool g_ndb_log_slave_updates;
176
177 static bool g_injector_v1_warning_emitted = false;
178
179 #ifndef NDEBUG
print_records(TABLE * table,const uchar * record)180 static void print_records(TABLE *table, const uchar *record)
181 {
182 for (uint j= 0; j < table->s->fields; j++)
183 {
184 char buf[40];
185 int pos= 0;
186 Field *field= table->field[j];
187 const uchar* field_ptr= field->ptr - table->record[0] + record;
188 int pack_len= field->pack_length();
189 int n= pack_len < 10 ? pack_len : 10;
190
191 for (int i= 0; i < n && pos < 20; i++)
192 {
193 pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
194 }
195 buf[pos]= 0;
196 DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
197 }
198 }
199 #else
200 #define print_records(a,b)
201 #endif
202
203
204 #ifndef NDEBUG
dbug_print_table(const char * info,TABLE * table)205 static void dbug_print_table(const char *info, TABLE *table)
206 {
207 if (table == 0)
208 {
209 DBUG_PRINT("info",("%s: (null)", info));
210 return;
211 }
212 DBUG_PRINT("info",
213 ("%s: %s.%s s->fields: %d "
214 "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
215 "record[1]: 0x%lx",
216 info,
217 table->s->db.str,
218 table->s->table_name.str,
219 table->s->fields,
220 table->s->reclength,
221 table->s->rec_buff_length,
222 (long) table->record[0],
223 (long) table->record[1]));
224
225 for (unsigned int i= 0; i < table->s->fields; i++)
226 {
227 Field *f= table->field[i];
228 DBUG_PRINT("info",
229 ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
230 "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
231 i,
232 f->field_name,
233 (long) f->flags,
234 (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
235 (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
236 (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
237 (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
238 (f->flags & BLOB_FLAG) ? ",blob" : "",
239 (f->flags & BINARY_FLAG) ? ",binary" : "",
240 f->real_type(),
241 f->pack_length(),
242 (long) f->ptr, (int) (f->ptr - table->record[0]),
243 f->null_bit,
244 (long) f->null_offset(0),
245 (int) f->null_offset()));
246 if (f->type() == MYSQL_TYPE_BIT)
247 {
248 Field_bit *g= (Field_bit*) f;
249 DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
250 "bit_ofs: %d bit_len: %u",
251 g->field_length, (long) g->bit_ptr,
252 (int) ((uchar*) g->bit_ptr -
253 table->record[0]),
254 g->bit_ofs, g->bit_len));
255 }
256 }
257 }
258 #else
259 #define dbug_print_table(a,b)
260 #endif
261
262
run_query(THD * thd,char * buf,char * end,const int * no_print_error)263 static void run_query(THD *thd, char *buf, char *end,
264 const int *no_print_error)
265 {
266 /*
267 NOTE! Don't use this function for new implementation, backward
268 compat. only
269 */
270
271 Ndb_local_connection mysqld(thd);
272
273 /*
274 Run the query, suppress some errors from being printed
275 to log and ignore any error returned
276 */
277 (void)mysqld.raw_run_query(buf, (end - buf),
278 no_print_error);
279 }
280
281 static void
ndb_binlog_close_shadow_table(NDB_SHARE * share)282 ndb_binlog_close_shadow_table(NDB_SHARE *share)
283 {
284 DBUG_ENTER("ndb_binlog_close_shadow_table");
285 Ndb_event_data *event_data= share->event_data;
286 if (event_data)
287 {
288 delete event_data;
289 share->event_data= 0;
290 }
291 DBUG_VOID_RETURN;
292 }
293
294
295 /*
296 Open a shadow table for the table given in share.
297 - The shadow table is (mainly) used when an event is
298 received from the data nodes which need to be written
299 to the binlog injector.
300 */
301
302 static int
ndb_binlog_open_shadow_table(THD * thd,NDB_SHARE * share)303 ndb_binlog_open_shadow_table(THD *thd, NDB_SHARE *share)
304 {
305 int error;
306 assert(share->event_data == 0);
307 Ndb_event_data *event_data= share->event_data= new Ndb_event_data(share);
308 DBUG_ENTER("ndb_binlog_open_shadow_table");
309
310 MEM_ROOT **root_ptr= my_thread_get_THR_MALLOC();
311 MEM_ROOT *old_root= *root_ptr;
312 init_sql_alloc(PSI_INSTRUMENT_ME, &event_data->mem_root, 1024, 0);
313 *root_ptr= &event_data->mem_root;
314
315 TABLE_SHARE *shadow_table_share=
316 (TABLE_SHARE*)alloc_root(&event_data->mem_root, sizeof(TABLE_SHARE));
317 TABLE *shadow_table=
318 (TABLE*)alloc_root(&event_data->mem_root, sizeof(TABLE));
319
320 init_tmp_table_share(thd, shadow_table_share,
321 share->db, 0,
322 share->table_name,
323 share->key_string());
324 if ((error= open_table_def(thd, shadow_table_share, 0)) ||
325 (error= open_table_from_share(thd, shadow_table_share, "", 0,
326 (uint) (OPEN_FRM_FILE_ONLY | DELAYED_OPEN | READ_ALL),
327 0, shadow_table,
328 false
329 )))
330 {
331 DBUG_PRINT("error", ("failed to open shadow table, error: %d my_errno: %d",
332 error, my_errno()));
333 free_table_share(shadow_table_share);
334 delete event_data;
335 share->event_data= 0;
336 *root_ptr= old_root;
337 DBUG_RETURN(error);
338 }
339 event_data->shadow_table= shadow_table;
340
341 mysql_mutex_lock(&LOCK_open);
342 assign_new_table_id(shadow_table_share);
343 mysql_mutex_unlock(&LOCK_open);
344
345 shadow_table->in_use= injector_thd;
346
347
348 // Allocate strings for db and table_name for shadow_table
349 // in event_data's MEM_ROOT(where the shadow_table itself is allocated)
350 lex_string_copy(&event_data->mem_root,
351 &shadow_table->s->db,
352 share->db);
353 lex_string_copy(&event_data->mem_root,
354 &shadow_table->s->table_name,
355 share->table_name);
356
357 /* We can't use 'use_all_columns()' as the file object is not setup yet */
358 shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
359 &shadow_table->s->all_set);
360
361 if (shadow_table->s->primary_key == MAX_KEY)
362 share->flags|= NSF_HIDDEN_PK;
363
364 if (shadow_table->s->blob_fields != 0)
365 share->flags|= NSF_BLOB_FLAG;
366
367 event_data->init_pk_bitmap();
368
369 #ifndef NDEBUG
370 dbug_print_table("table", shadow_table);
371 #endif
372 *root_ptr= old_root;
373 DBUG_RETURN(0);
374 }
375
376
377 /*
378 Initialize the binlog part of the NDB_SHARE
379 */
ndbcluster_binlog_init_share(THD * thd,NDB_SHARE * share,TABLE * _table)380 int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table)
381 {
382 DBUG_ENTER("ndbcluster_binlog_init_share");
383
384 if (!share->need_events(ndb_binlog_running))
385 {
386 if (_table)
387 {
388 if (_table->s->primary_key == MAX_KEY)
389 share->flags|= NSF_HIDDEN_PK;
390 if (_table->s->blob_fields != 0)
391 share->flags|= NSF_BLOB_FLAG;
392 }
393 else
394 {
395 share->flags|= NSF_NO_BINLOG;
396 }
397 DBUG_RETURN(0);
398 }
399
400 DBUG_RETURN(ndb_binlog_open_shadow_table(thd, share));
401 }
402
403 static int
get_ndb_blobs_value(TABLE * table,NdbValue * value_array,uchar * & buffer,uint & buffer_size,my_ptrdiff_t ptrdiff)404 get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
405 uchar*& buffer, uint& buffer_size,
406 my_ptrdiff_t ptrdiff)
407 {
408 DBUG_ENTER("get_ndb_blobs_value");
409
410 // Field has no field number so cannot use TABLE blob_field
411 // Loop twice, first only counting total buffer size
412 for (int loop= 0; loop <= 1; loop++)
413 {
414 uint32 offset= 0;
415 for (uint i= 0; i < table->s->fields; i++)
416 {
417 Field *field= table->field[i];
418 NdbValue value= value_array[i];
419 if (! (field->flags & BLOB_FLAG))
420 continue;
421 if (value.blob == NULL)
422 {
423 DBUG_PRINT("info",("[%u] skipped", i));
424 continue;
425 }
426 Field_blob *field_blob= (Field_blob *)field;
427 NdbBlob *ndb_blob= value.blob;
428 int isNull;
429 if (ndb_blob->getNull(isNull) != 0)
430 DBUG_RETURN(-1);
431 if (isNull == 0) {
432 Uint64 len64= 0;
433 if (ndb_blob->getLength(len64) != 0)
434 DBUG_RETURN(-1);
435 // Align to Uint64
436 uint32 size= Uint32(len64);
437 if (size % 8 != 0)
438 size+= 8 - size % 8;
439 if (loop == 1)
440 {
441 uchar *buf= buffer + offset;
442 uint32 len= 0xffffffff; // Max uint32
443 if (ndb_blob->readData(buf, len) != 0)
444 DBUG_RETURN(-1);
445 DBUG_PRINT("info", ("[%u] offset: %u buf: 0x%lx len=%u [ptrdiff=%d]",
446 i, offset, (long) buf, len, (int)ptrdiff));
447 assert(len == len64);
448 // Ugly hack assumes only ptr needs to be changed
449 field_blob->set_ptr_offset(ptrdiff, len, buf);
450 }
451 offset+= size;
452 }
453 else if (loop == 1) // undefined or null
454 {
455 // have to set length even in this case
456 uchar *buf= buffer + offset; // or maybe NULL
457 uint32 len= 0;
458 field_blob->set_ptr_offset(ptrdiff, len, buf);
459 DBUG_PRINT("info", ("[%u] isNull=%d", i, isNull));
460 }
461 }
462 if (loop == 0 && offset > buffer_size)
463 {
464 my_free(buffer);
465 buffer_size= 0;
466 DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
467 buffer= (uchar*) my_malloc(PSI_INSTRUMENT_ME, offset, MYF(MY_WME));
468 if (buffer == NULL)
469 {
470 sql_print_error("get_ndb_blobs_value: my_malloc(%u) failed", offset);
471 DBUG_RETURN(-1);
472 }
473 buffer_size= offset;
474 }
475 }
476 DBUG_RETURN(0);
477 }
478
479
480 /*****************************************************************
481 functions called from master sql client threads
482 ****************************************************************/
483
484 /*
485 called in mysql_show_binlog_events and reset_logs to make sure we wait for
486 all events originating from the 'thd' to arrive in the binlog.
487
488 'thd' is expected to be non-NULL.
489
490 Wait for the epoch in which the last transaction of the 'thd' is a part of.
491
492 Wait a maximum of 30 seconds.
493 */
ndbcluster_binlog_wait(THD * thd)494 static void ndbcluster_binlog_wait(THD *thd)
495 {
496 if (ndb_binlog_running)
497 {
498 DBUG_ENTER("ndbcluster_binlog_wait");
499 assert(thd);
500 assert(thd_sql_command(thd) == SQLCOM_SHOW_BINLOG_EVENTS ||
501 thd_sql_command(thd) == SQLCOM_FLUSH ||
502 thd_sql_command(thd) == SQLCOM_RESET);
503 /*
504 Binlog Injector should not wait for itself
505 */
506 if (thd->system_thread == SYSTEM_THREAD_NDBCLUSTER_BINLOG)
507 DBUG_VOID_RETURN;
508
509 Thd_ndb *thd_ndb = get_thd_ndb(thd);
510 if (!thd_ndb)
511 {
512 /*
513 thd has not interfaced with ndb before
514 so there is no need for waiting
515 */
516 DBUG_VOID_RETURN;
517 }
518
519 const char *save_info = thd->proc_info;
520 thd->proc_info = "Waiting for ndbcluster binlog update to "
521 "reach current position";
522
523 const Uint64 start_handled_epoch = ndb_latest_handled_binlog_epoch;
524 /*
525 Highest epoch that a transaction against Ndb has received
526 as part of commit processing *in this thread*. This is a
527 per-session 'most recent change' indicator.
528 */
529 const Uint64 session_last_committed_epoch =
530 thd_ndb->m_last_commit_epoch_session;
531
532 /*
533 * Wait until the last committed epoch from the session enters Binlog.
534 * Break any possible deadlock after 30s.
535 */
536 int count = 30;
537
538 native_mutex_lock(&injector_mutex);
539 while (!thd->killed && count && ndb_binlog_running &&
540 (ndb_latest_handled_binlog_epoch == 0 ||
541 ndb_latest_handled_binlog_epoch < session_last_committed_epoch))
542 {
543 count--;
544 struct timespec abstime;
545 set_timespec(&abstime, 1);
546 native_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
547 }
548 native_mutex_unlock(&injector_mutex);
549
550 if (count == 0)
551 {
552 sql_print_warning("NDB: Thread id %u timed out (30s) waiting for epoch %u/%u "
553 "to be handled. Progress : %u/%u -> %u/%u.",
554 thd->thread_id(),
555 Uint32((session_last_committed_epoch >> 32) & 0xffffffff),
556 Uint32(session_last_committed_epoch & 0xffffffff),
557 Uint32((start_handled_epoch >> 32) & 0xffffffff),
558 Uint32(start_handled_epoch & 0xffffffff),
559 Uint32((ndb_latest_handled_binlog_epoch >> 32) & 0xffffffff),
560 Uint32(ndb_latest_handled_binlog_epoch & 0xffffffff));
561
562 // Fail on wait/deadlock timeout in debug compile
563 assert(false);
564 }
565
566 thd->proc_info= save_info;
567 DBUG_VOID_RETURN;
568 }
569 }
570
571 /*
572 Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
573 */
ndbcluster_reset_logs(THD * thd)574 static int ndbcluster_reset_logs(THD *thd)
575 {
576 if (!ndb_binlog_running)
577 return 0;
578
579 /* only reset master should reset logs */
580 if (!((thd->lex->sql_command == SQLCOM_RESET) &&
581 (thd->lex->type & REFRESH_MASTER)))
582 return 0;
583
584 DBUG_ENTER("ndbcluster_reset_logs");
585
586 /*
587 Wait for all events originating from this mysql server has
588 reached the binlog before continuing to reset
589 */
590 ndbcluster_binlog_wait(thd);
591
592 /*
593 Truncate mysql.ndb_binlog_index table, if table does not
594 exist ignore the error as it is a "consistent" behavior
595 */
596 Ndb_local_connection mysqld(thd);
597 const bool ignore_no_such_table = true;
598 if(mysqld.truncate_table(STRING_WITH_LEN("mysql"),
599 STRING_WITH_LEN("ndb_binlog_index"),
600 ignore_no_such_table))
601 {
602 // Failed to truncate table
603 DBUG_RETURN(1);
604 }
605 DBUG_RETURN(0);
606 }
607
608 /*
609 Setup THD object
610 'Inspired' from ha_ndbcluster.cc : ndb_util_thread_func
611 */
612 THD *
ndb_create_thd(char * stackptr)613 ndb_create_thd(char * stackptr)
614 {
615 DBUG_ENTER("ndb_create_thd");
616 THD * thd= new THD; /* note that contructor of THD uses DBUG_ */
617 if (thd == 0)
618 {
619 DBUG_RETURN(0);
620 }
621 THD_CHECK_SENTRY(thd);
622
623 thd->thread_stack= stackptr; /* remember where our stack is */
624 if (thd->store_globals())
625 {
626 delete thd;
627 DBUG_RETURN(0);
628 }
629
630 thd->init_for_queries();
631 thd_set_command(thd, COM_DAEMON);
632 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
633 #ifndef NDB_THD_HAS_NO_VERSION
634 thd->version= refresh_version;
635 #endif
636 thd->get_protocol_classic()->set_client_capabilities(0);
637 thd->lex->start_transaction_opt= 0;
638 thd->security_context()->skip_grants();
639
640 CHARSET_INFO *charset_connection= get_charset_by_csname("utf8",
641 MY_CS_PRIMARY,
642 MYF(MY_WME));
643 thd->variables.character_set_client= charset_connection;
644 thd->variables.character_set_results= charset_connection;
645 thd->variables.collation_connection= charset_connection;
646 thd->update_charset();
647 DBUG_RETURN(thd);
648 }
649
650 /*
651 Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
652 is removed
653 */
654
655 static int
ndbcluster_binlog_index_purge_file(THD * passed_thd,const char * file)656 ndbcluster_binlog_index_purge_file(THD *passed_thd, const char *file)
657 {
658 int stack_base = 0;
659 int error = 0;
660 DBUG_ENTER("ndbcluster_binlog_index_purge_file");
661 DBUG_PRINT("enter", ("file: %s", file));
662
663 if (!ndb_binlog_running || (passed_thd && passed_thd->slave_thread))
664 DBUG_RETURN(0);
665
666 /**
667 * This function cannot safely reuse the passed thd object
668 * due to the variety of places from which it is called.
669 * new/delete one...yuck!
670 */
671 THD* my_thd;
672 if ((my_thd = ndb_create_thd((char*)&stack_base) /* stack ptr */) == 0)
673 {
674 /**
675 * TODO return proper error code here,
676 * BUT! return code is not (currently) checked in
677 * log.cc : purge_index_entry() so we settle for warning printout
678 * Will sql_print_warning fail with no thd?
679 */
680 sql_print_warning("NDB: Unable to purge "
681 NDB_REP_DB "." NDB_REP_TABLE
682 " File=%s (failed to setup thd)", file);
683 DBUG_RETURN(0);
684 }
685
686
687 /*
688 delete rows from mysql.ndb_binlog_index table for the given
689 filename, if table does not exist ignore the error as it
690 is a "consistent" behavior
691 */
692 Ndb_local_connection mysqld(my_thd);
693 const bool ignore_no_such_table = true;
694 if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
695 STRING_WITH_LEN("ndb_binlog_index"),
696 ignore_no_such_table,
697 "File='", file, "'", NULL))
698 {
699 // Failed to delete rows from table
700 error = 1;
701 }
702
703 delete my_thd;
704
705 if (passed_thd)
706 {
707 /* Relink passed THD with this thread */
708 passed_thd->store_globals();
709 }
710
711 DBUG_RETURN(error);
712 }
713
714
715 // Determine if privilege tables are distributed, ie. stored in NDB
716 bool
priv_tables_are_in_ndb(THD * thd)717 Ndb_dist_priv_util::priv_tables_are_in_ndb(THD* thd)
718 {
719 bool distributed= false;
720 Ndb_dist_priv_util dist_priv;
721 DBUG_ENTER("ndbcluster_distributed_privileges");
722
723 Ndb *ndb= check_ndb_in_thd(thd);
724 if (!ndb)
725 DBUG_RETURN(false); // MAGNUS, error message?
726
727 if (ndb->setDatabaseName(dist_priv.database()) != 0)
728 DBUG_RETURN(false);
729
730 const char* table_name;
731 while((table_name= dist_priv.iter_next_table()))
732 {
733 DBUG_PRINT("info", ("table_name: %s", table_name));
734 Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
735 const NDBTAB *ndbtab= ndbtab_g.get_table();
736 if (ndbtab)
737 {
738 distributed= true;
739 }
740 else if (distributed)
741 {
742 sql_print_error("NDB: Inconsistency detected in distributed "
743 "privilege tables. Table '%s.%s' is not distributed",
744 dist_priv.database(), table_name);
745 DBUG_RETURN(false);
746 }
747 }
748 DBUG_RETURN(distributed);
749 }
750
751
752 /*
753 ndbcluster_binlog_log_query
754
755 - callback function installed in handlerton->binlog_log_query
756 - called by MySQL Server in places where no other handlerton
757 function exists which can be used to notify about changes
758 - used by ndbcluster to detect when
759 -- databases are created or altered
760 -- privilege tables have been modified
761 */
762
763 static void
ndbcluster_binlog_log_query(handlerton * hton,THD * thd,enum_binlog_command binlog_command,const char * query,uint query_length,const char * db,const char * table_name)764 ndbcluster_binlog_log_query(handlerton *hton, THD *thd,
765 enum_binlog_command binlog_command,
766 const char *query, uint query_length,
767 const char *db, const char *table_name)
768 {
769 DBUG_ENTER("ndbcluster_binlog_log_query");
770 DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
771 db, table_name, query));
772 enum SCHEMA_OP_TYPE type;
773 /* Use random table_id and table_version */
774 const uint32 table_id = (uint32)rand();
775 const uint32 table_version = (uint32)rand();
776 switch (binlog_command)
777 {
778 case LOGCOM_CREATE_DB:
779 DBUG_PRINT("info", ("New database '%s' created", db));
780 type= SOT_CREATE_DB;
781 break;
782
783 case LOGCOM_ALTER_DB:
784 DBUG_PRINT("info", ("The database '%s' was altered", db));
785 type= SOT_ALTER_DB;
786 break;
787
788 case LOGCOM_ACL_NOTIFY:
789 DBUG_PRINT("info", ("Privilege tables have been modified"));
790 type= SOT_GRANT;
791 if (!Ndb_dist_priv_util::priv_tables_are_in_ndb(thd))
792 {
793 DBUG_VOID_RETURN;
794 }
795 /*
796 NOTE! Grant statements with db set to NULL is very rare but
797 may be provoked by for example dropping the currently selected
798 database. Since ndbcluster_log_schema_op does not allow
799 db to be NULL(can't create a key for the ndb_schem_object nor
800 writeNULL to ndb_schema), the situation is salvaged by setting db
801 to the constant string "mysql" which should work in most cases.
802
803 Interestingly enough this "hack" has the effect that grant statements
804 are written to the remote binlog in same format as if db would have
805 been NULL.
806 */
807 if (!db)
808 db = "mysql";
809 break;
810
811 default:
812 DBUG_PRINT("info", ("Ignoring binlog_log_query notification"));
813 DBUG_VOID_RETURN;
814 break;
815
816 }
817 ndbcluster_log_schema_op(thd, query, query_length,
818 db, table_name, table_id, table_version, type,
819 NULL, NULL);
820 DBUG_VOID_RETURN;
821 }
822
823 extern void ndb_util_thread_stop(void);
824
825 // Instantiate Ndb_binlog_thread component
826 static Ndb_binlog_thread ndb_binlog_thread;
827
828
829 /*
830 End use of the NDB Cluster binlog
831 - wait for binlog thread to shutdown
832 */
833
ndbcluster_binlog_end(THD * thd)834 int ndbcluster_binlog_end(THD *thd)
835 {
836 DBUG_ENTER("ndbcluster_binlog_end");
837
838 // Stop ndb_util_thread first since it uses THD(which
839 // implicitly depend on binlog)
840 ndb_util_thread_stop();
841
842 if (ndbcluster_binlog_inited)
843 {
844 ndbcluster_binlog_inited= 0;
845
846 ndb_binlog_thread.stop();
847 ndb_binlog_thread.deinit();
848
849 native_mutex_destroy(&injector_mutex);
850 native_cond_destroy(&injector_cond);
851 native_mutex_destroy(&ndb_schema_share_mutex);
852 }
853
854 DBUG_RETURN(0);
855 }
856
857 /*****************************************************************
858 functions called from slave sql client threads
859 ****************************************************************/
ndbcluster_reset_slave(THD * thd)860 static void ndbcluster_reset_slave(THD *thd)
861 {
862 int error = 0;
863 if (!ndb_binlog_running)
864 return;
865
866 DBUG_ENTER("ndbcluster_reset_slave");
867
868 /*
869 delete all rows from mysql.ndb_apply_status table
870 - if table does not exist ignore the error as it
871 is a consistent behavior
872 */
873 Ndb_local_connection mysqld(thd);
874 const bool ignore_no_such_table = true;
875 if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
876 STRING_WITH_LEN("ndb_apply_status"),
877 ignore_no_such_table,
878 NULL))
879 {
880 // Failed to delete rows from table
881 error = 1;
882 }
883
884 g_ndb_slave_state.atResetSlave();
885
886 // pending fix for bug#59844 will make this function return int
887 DBUG_VOID_RETURN;
888 }
889
890 /*
891 Initialize the binlog part of the ndb handlerton
892 */
893
ndbcluster_binlog_func(handlerton * hton,THD * thd,enum_binlog_func fn,void * arg)894 static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
895 enum_binlog_func fn,
896 void *arg)
897 {
898 DBUG_ENTER("ndbcluster_binlog_func");
899 int res= 0;
900 switch(fn)
901 {
902 case BFN_RESET_LOGS:
903 res= ndbcluster_reset_logs(thd);
904 break;
905 case BFN_RESET_SLAVE:
906 ndbcluster_reset_slave(thd);
907 break;
908 case BFN_BINLOG_WAIT:
909 ndbcluster_binlog_wait(thd);
910 break;
911 case BFN_BINLOG_END:
912 res= ndbcluster_binlog_end(thd);
913 break;
914 case BFN_BINLOG_PURGE_FILE:
915 res= ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
916 break;
917 }
918 DBUG_RETURN(res);
919 }
920
ndbcluster_binlog_init(handlerton * h)921 void ndbcluster_binlog_init(handlerton* h)
922 {
923 h->binlog_func= ndbcluster_binlog_func;
924 h->binlog_log_query= ndbcluster_binlog_log_query;
925 }
926
927
928 /*
929 Convert db and table name into a key to use for searching
930 the ndbcluster_open_tables hash
931 */
932 static size_t
ndb_open_tables__create_key(char * key_buf,size_t key_buf_length,const char * db,size_t db_length,const char * table,size_t table_length)933 ndb_open_tables__create_key(char* key_buf, size_t key_buf_length,
934 const char* db, size_t db_length,
935 const char* table, size_t table_length)
936 {
937 size_t key_length = my_snprintf(key_buf, key_buf_length,
938 "./%*s/%*s", db_length, db,
939 table_length, table) - 1;
940 assert(key_length > 0);
941 assert(key_length < key_buf_length);
942
943 return key_length;
944 }
945
946
947 /*
948 Check if table with given name is open, ie. is
949 in ndbcluster_open_tables hash
950 */
951 static bool
ndb_open_tables__is_table_open(const char * db,size_t db_length,const char * table,size_t table_length)952 ndb_open_tables__is_table_open(const char* db, size_t db_length,
953 const char* table, size_t table_length)
954 {
955 char key[FN_REFLEN + 1];
956 size_t key_length = ndb_open_tables__create_key(key, sizeof(key),
957 db, db_length,
958 table, table_length);
959 DBUG_ENTER("ndb_open_tables__is_table_open");
960 DBUG_PRINT("enter", ("db: '%s', table: '%s', key: '%s'",
961 db, table, key));
962
963 native_mutex_lock(&ndbcluster_mutex);
964 bool result = my_hash_search(&ndbcluster_open_tables,
965 (const uchar*)key,
966 key_length) != NULL;
967 native_mutex_unlock(&ndbcluster_mutex);
968
969 DBUG_PRINT("exit", ("result: %d", result));
970 DBUG_RETURN(result);
971 }
972
973
974 static bool
ndbcluster_check_ndb_schema_share()975 ndbcluster_check_ndb_schema_share()
976 {
977 return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
978 STRING_WITH_LEN("ndb_schema"));
979 }
980
981
982 static bool
ndbcluster_check_ndb_apply_status_share()983 ndbcluster_check_ndb_apply_status_share()
984 {
985 return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
986 STRING_WITH_LEN("ndb_apply_status"));
987 }
988
989
990 static bool
create_cluster_sys_table(THD * thd,const char * db,size_t db_length,const char * table,size_t table_length,const char * create_definitions,const char * create_options)991 create_cluster_sys_table(THD *thd, const char* db, size_t db_length,
992 const char* table, size_t table_length,
993 const char* create_definitions,
994 const char* create_options)
995 {
996 if (ndb_open_tables__is_table_open(db, db_length, table, table_length))
997 return false;
998
999 if (g_ndb_cluster_connection->get_no_ready() <= 0)
1000 return false;
1001
1002 if (opt_ndb_extra_logging)
1003 sql_print_information("NDB: Creating %s.%s", db, table);
1004
1005 Ndb_local_connection mysqld(thd);
1006
1007 /*
1008 Check if table exists in MySQL "dictionary"(i.e on disk)
1009 if so, remove it since there is none in Ndb
1010 */
1011 {
1012 char path[FN_REFLEN + 1];
1013 build_table_filename(path, sizeof(path) - 1,
1014 db, table, reg_ext, 0);
1015 if (my_delete(path, MYF(0)) == 0)
1016 {
1017 /*
1018 The .frm file existed and was deleted from disk.
1019 It's possible that someone has tried to use it and thus
1020 it might have been inserted in the table definition cache.
1021 It must be flushed to avoid that it exist only in the
1022 table definition cache.
1023 */
1024 if (opt_ndb_extra_logging)
1025 sql_print_information("NDB: Flushing %s.%s", db, table);
1026
1027 /* Flush mysql.ndb_apply_status table, ignore all errors */
1028 (void)mysqld.flush_table(db, db_length,
1029 table, table_length);
1030 }
1031 }
1032
1033 const bool create_if_not_exists = true;
1034 const bool res = mysqld.create_sys_table(db, db_length,
1035 table, table_length,
1036 create_if_not_exists,
1037 create_definitions,
1038 create_options);
1039 return res;
1040 }
1041
1042
1043 static bool
ndb_apply_table__create(THD * thd)1044 ndb_apply_table__create(THD *thd)
1045 {
1046 DBUG_ENTER("ndb_apply_table__create");
1047
1048 /* NOTE! Updating this table schema must be reflected in ndb_restore */
1049 const bool res =
1050 create_cluster_sys_table(thd,
1051 STRING_WITH_LEN("mysql"),
1052 STRING_WITH_LEN("ndb_apply_status"),
1053 // table_definition
1054 "server_id INT UNSIGNED NOT NULL,"
1055 "epoch BIGINT UNSIGNED NOT NULL, "
1056 "log_name VARCHAR(255) BINARY NOT NULL, "
1057 "start_pos BIGINT UNSIGNED NOT NULL, "
1058 "end_pos BIGINT UNSIGNED NOT NULL, "
1059 "PRIMARY KEY USING HASH (server_id)",
1060 // table_options
1061 "ENGINE=NDB CHARACTER SET latin1");
1062 DBUG_RETURN(res);
1063 }
1064
1065
1066 static bool
ndb_schema_table__create(THD * thd)1067 ndb_schema_table__create(THD *thd)
1068 {
1069 DBUG_ENTER("ndb_schema_table__create");
1070
1071 /* NOTE! Updating this table schema must be reflected in ndb_restore */
1072 const bool res =
1073 create_cluster_sys_table(thd,
1074 STRING_WITH_LEN("mysql"),
1075 STRING_WITH_LEN("ndb_schema"),
1076 // table_definition
1077 "db VARBINARY("
1078 NDB_MAX_DDL_NAME_BYTESIZE_STR
1079 ") NOT NULL,"
1080 "name VARBINARY("
1081 NDB_MAX_DDL_NAME_BYTESIZE_STR
1082 ") NOT NULL,"
1083 "slock BINARY(32) NOT NULL,"
1084 "query BLOB NOT NULL,"
1085 "node_id INT UNSIGNED NOT NULL,"
1086 "epoch BIGINT UNSIGNED NOT NULL,"
1087 "id INT UNSIGNED NOT NULL,"
1088 "version INT UNSIGNED NOT NULL,"
1089 "type INT UNSIGNED NOT NULL,"
1090 "PRIMARY KEY USING HASH (db,name)",
1091 // table_options
1092 "ENGINE=NDB CHARACTER SET latin1");
1093 DBUG_RETURN(res);
1094 }
1095
1096 class Thd_ndb_options_guard
1097 {
1098 public:
Thd_ndb_options_guard(Thd_ndb * thd_ndb)1099 Thd_ndb_options_guard(Thd_ndb *thd_ndb)
1100 : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
~Thd_ndb_options_guard()1101 ~Thd_ndb_options_guard() { m_val= m_save_val; }
set(uint32 flag)1102 void set(uint32 flag) { m_val|= flag; }
1103 private:
1104 uint32 &m_val;
1105 uint32 m_save_val;
1106 };
1107
1108 extern int ndb_setup_complete;
1109 extern native_cond_t COND_ndb_setup_complete;
1110
1111 /*
1112 ndb_notify_tables_writable
1113
1114 Called to notify any waiting threads that Ndb tables are
1115 now writable
1116 */
ndb_notify_tables_writable()1117 static void ndb_notify_tables_writable()
1118 {
1119 native_mutex_lock(&ndbcluster_mutex);
1120 ndb_setup_complete= 1;
1121 native_cond_broadcast(&COND_ndb_setup_complete);
1122 native_mutex_unlock(&ndbcluster_mutex);
1123 }
1124
1125
1126 /*
1127 Clean-up any stray files for non-existing NDB tables
1128 - "stray" means that there is a .frm + .ndb file on disk
1129 but there exists no such table in NDB. The two files
1130 can then be deleted from disk to get in synch with
1131 what's in NDB.
1132 */
1133 static
clean_away_stray_files(THD * thd)1134 void clean_away_stray_files(THD *thd)
1135 {
1136 DBUG_ENTER("clean_away_stray_files");
1137
1138 // Populate list of databases
1139 Ndb_find_files_list db_names(thd);
1140 if (!db_names.find_databases(mysql_data_home))
1141 {
1142 thd->clear_error();
1143 DBUG_PRINT("info", ("Failed to find databases"));
1144 DBUG_VOID_RETURN;
1145 }
1146
1147 LEX_STRING *db_name;
1148 while ((db_name= db_names.next()))
1149 {
1150 DBUG_PRINT("info", ("Found database %s", db_name->str));
1151 if (strcmp(NDB_REP_DB, db_name->str)) /* Skip system database */
1152 {
1153
1154 sql_print_information("NDB: Cleaning stray tables from database '%s'",
1155 db_name->str);
1156
1157 char path[FN_REFLEN + 1];
1158 build_table_filename(path, sizeof(path) - 1, db_name->str, "", "", 0);
1159
1160 /* Require that no binlog setup is attempted yet, that will come later
1161 * right now we just want to get rid of stray frms et al
1162 */
1163
1164 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1165 thd_ndb->set_skip_binlog_setup_in_find_files(true);
1166 Ndb_find_files_list tab_names(thd);
1167 if (!tab_names.find_tables(db_name->str, path))
1168 {
1169 thd->clear_error();
1170 DBUG_PRINT("info", ("Failed to find tables"));
1171 }
1172 thd_ndb->set_skip_binlog_setup_in_find_files(false);
1173 }
1174 }
1175 DBUG_VOID_RETURN;
1176 }
1177
1178 /*
1179 Ndb has no representation of the database schema objects.
1180 The mysql.ndb_schema table contains the latest schema operations
1181 done via a mysqld, and thus reflects databases created/dropped/altered
1182 while a mysqld was disconnected. This function tries to recover
1183 the correct state w.r.t created databases using the information in
1184 that table.
1185
1186
1187 */
ndbcluster_find_all_databases(THD * thd)1188 static int ndbcluster_find_all_databases(THD *thd)
1189 {
1190 Ndb *ndb= check_ndb_in_thd(thd);
1191 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1192 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
1193 NDBDICT *dict= ndb->getDictionary();
1194 NdbTransaction *trans= NULL;
1195 NdbError ndb_error;
1196 int retries= 100;
1197 int retry_sleep= 30; /* 30 milliseconds, transaction */
1198 DBUG_ENTER("ndbcluster_find_all_databases");
1199
1200 /*
1201 Function should only be called while ndbcluster_global_schema_lock
1202 is held, to ensure that ndb_schema table is not being updated while
1203 scanning.
1204 */
1205 if (!thd_ndb->has_required_global_schema_lock("ndbcluster_find_all_databases"))
1206 DBUG_RETURN(1);
1207
1208 ndb->setDatabaseName(NDB_REP_DB);
1209 thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
1210 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
1211 while (1)
1212 {
1213 char db_buffer[FN_REFLEN];
1214 char *db= db_buffer+1;
1215 char name[FN_REFLEN];
1216 char query[64000];
1217 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1218 const NDBTAB *ndbtab= ndbtab_g.get_table();
1219 NdbScanOperation *op;
1220 NdbBlob *query_blob_handle;
1221 int r= 0;
1222 if (ndbtab == NULL)
1223 {
1224 ndb_error= dict->getNdbError();
1225 goto error;
1226 }
1227 trans= ndb->startTransaction();
1228 if (trans == NULL)
1229 {
1230 ndb_error= ndb->getNdbError();
1231 goto error;
1232 }
1233 op= trans->getNdbScanOperation(ndbtab);
1234 if (op == NULL)
1235 {
1236 ndb_error= trans->getNdbError();
1237 goto error;
1238 }
1239
1240 op->readTuples(NdbScanOperation::LM_Read,
1241 NdbScanOperation::SF_TupScan, 1);
1242
1243 r|= op->getValue("db", db_buffer) == NULL;
1244 r|= op->getValue("name", name) == NULL;
1245 r|= (query_blob_handle= op->getBlobHandle("query")) == NULL;
1246 r|= query_blob_handle->getValue(query, sizeof(query));
1247
1248 if (r)
1249 {
1250 ndb_error= op->getNdbError();
1251 goto error;
1252 }
1253
1254 if (trans->execute(NdbTransaction::NoCommit))
1255 {
1256 ndb_error= trans->getNdbError();
1257 goto error;
1258 }
1259
1260 while ((r= op->nextResult()) == 0)
1261 {
1262 unsigned db_len= db_buffer[0];
1263 unsigned name_len= name[0];
1264 /*
1265 name_len == 0 means no table name, hence the row
1266 is for a database
1267 */
1268 if (db_len > 0 && name_len == 0)
1269 {
1270 /* database found */
1271 db[db_len]= 0;
1272
1273 /* find query */
1274 Uint64 query_length= 0;
1275 if (query_blob_handle->getLength(query_length))
1276 {
1277 ndb_error= query_blob_handle->getNdbError();
1278 goto error;
1279 }
1280 query[query_length]= 0;
1281 build_table_filename(name, sizeof(name), db, "", "", 0);
1282 int database_exists= !my_access(name, F_OK);
1283 if (native_strncasecmp("CREATE", query, 6) == 0)
1284 {
1285 /* Database should exist */
1286 if (!database_exists)
1287 {
1288 /* create missing database */
1289 sql_print_information("NDB: Discovered missing database '%s'", db);
1290 const int no_print_error[1]= {0};
1291 run_query(thd, query, query + query_length,
1292 no_print_error);
1293 }
1294 }
1295 else if (native_strncasecmp("ALTER", query, 5) == 0)
1296 {
1297 /* Database should exist */
1298 if (!database_exists)
1299 {
1300 /* create missing database */
1301 sql_print_information("NDB: Discovered missing database '%s'", db);
1302 const int no_print_error[1]= {0};
1303 name_len= (unsigned)my_snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
1304 run_query(thd, name, name + name_len,
1305 no_print_error);
1306 run_query(thd, query, query + query_length,
1307 no_print_error);
1308 }
1309 }
1310 else if (native_strncasecmp("DROP", query, 4) == 0)
1311 {
1312 /* Database should not exist */
1313 if (database_exists)
1314 {
1315 /* drop missing database */
1316 sql_print_information("NDB: Discovered remaining database '%s'", db);
1317 }
1318 }
1319 }
1320 }
1321 if (r == -1)
1322 {
1323 ndb_error= op->getNdbError();
1324 goto error;
1325 }
1326 ndb->closeTransaction(trans);
1327 trans= NULL;
1328 DBUG_RETURN(0); // success
1329 error:
1330 if (trans)
1331 {
1332 ndb->closeTransaction(trans);
1333 trans= NULL;
1334 }
1335 if (ndb_error.status == NdbError::TemporaryError && !thd->killed)
1336 {
1337 if (retries--)
1338 {
1339 sql_print_warning("NDB: ndbcluster_find_all_databases retry: %u - %s",
1340 ndb_error.code,
1341 ndb_error.message);
1342 do_retry_sleep(retry_sleep);
1343 continue; // retry
1344 }
1345 }
1346 if (!thd->killed)
1347 {
1348 sql_print_error("NDB: ndbcluster_find_all_databases fail: %u - %s",
1349 ndb_error.code,
1350 ndb_error.message);
1351 }
1352
1353 DBUG_RETURN(1); // not temp error or too many retries
1354 }
1355 }
1356
1357
1358 /*
1359 find all tables in ndb and discover those needed
1360 */
1361 static
ndbcluster_find_all_files(THD * thd)1362 int ndbcluster_find_all_files(THD *thd)
1363 {
1364 Ndb* ndb;
1365 char key[FN_REFLEN + 1];
1366 NDBDICT *dict;
1367 int unhandled= 0, retries= 5, skipped= 0;
1368 DBUG_ENTER("ndbcluster_find_all_files");
1369
1370 if (!(ndb= check_ndb_in_thd(thd)))
1371 DBUG_RETURN(HA_ERR_NO_CONNECTION);
1372
1373 dict= ndb->getDictionary();
1374
1375 do
1376 {
1377 NdbDictionary::Dictionary::List list;
1378 if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
1379 DBUG_RETURN(1);
1380 unhandled= 0;
1381 skipped= 0;
1382 retries--;
1383 for (uint i= 0 ; i < list.count ; i++)
1384 {
1385 NDBDICT::List::Element& elmt= list.elements[i];
1386 if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
1387 {
1388 DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
1389 continue;
1390 }
1391 DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
1392 if (elmt.state != NDBOBJ::StateOnline &&
1393 elmt.state != NDBOBJ::StateBackup &&
1394 elmt.state != NDBOBJ::StateBuilding)
1395 {
1396 sql_print_information("NDB: skipping setup table %s.%s, in state %d",
1397 elmt.database, elmt.name, elmt.state);
1398 skipped++;
1399 continue;
1400 }
1401
1402 ndb->setDatabaseName(elmt.database);
1403 Ndb_table_guard ndbtab_g(dict, elmt.name);
1404 const NDBTAB *ndbtab= ndbtab_g.get_table();
1405 if (!ndbtab)
1406 {
1407 if (retries == 0)
1408 sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
1409 elmt.database, elmt.name,
1410 dict->getNdbError().code,
1411 dict->getNdbError().message);
1412 unhandled++;
1413 continue;
1414 }
1415
1416 if (ndbtab->getFrmLength() == 0)
1417 continue;
1418
1419 /* check if database exists */
1420 char *end= key +
1421 build_table_filename(key, sizeof(key) - 1, elmt.database, "", "", 0);
1422 if (my_access(key, F_OK))
1423 {
1424 /* no such database defined, skip table */
1425 continue;
1426 }
1427 /* finalize construction of path */
1428 end+= tablename_to_filename(elmt.name, end,
1429 (uint)(sizeof(key)-(end-key)));
1430 uchar *data= 0, *pack_data= 0;
1431 size_t length, pack_length;
1432 int discover= 0;
1433 if (readfrm(key, &data, &length) ||
1434 packfrm(data, length, &pack_data, &pack_length))
1435 {
1436 discover= 1;
1437 sql_print_information("NDB: missing frm for %s.%s, discovering...",
1438 elmt.database, elmt.name);
1439 }
1440 else if (cmp_frm(ndbtab, pack_data, pack_length))
1441 {
1442 /* ndb_share reference temporary */
1443 NDB_SHARE *share= get_share(key, 0, FALSE);
1444 if (share)
1445 {
1446 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
1447 share->key_string(), share->use_count));
1448 }
1449 if (!share || get_ndb_share_state(share) != NSS_ALTERED)
1450 {
1451 discover= 1;
1452 sql_print_information("NDB: mismatch in frm for %s.%s,"
1453 " discovering...",
1454 elmt.database, elmt.name);
1455 }
1456 if (share)
1457 {
1458 /* ndb_share reference temporary free */
1459 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
1460 share->key_string(), share->use_count));
1461 free_share(&share);
1462 }
1463 }
1464 my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
1465 my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
1466
1467 if (discover)
1468 {
1469 /* ToDo 4.1 database needs to be created if missing */
1470 if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
1471 {
1472 /* ToDo 4.1 handle error */
1473 }
1474 }
1475 else
1476 {
1477 /* set up replication for this table */
1478 ndbcluster_create_binlog_setup(thd, ndb, key, (uint)(end-key),
1479 elmt.database, elmt.name,
1480 0);
1481 }
1482 }
1483 }
1484 while (unhandled && retries);
1485
1486 DBUG_RETURN(-(skipped + unhandled));
1487 }
1488
1489
1490 bool
ndb_binlog_setup(THD * thd)1491 ndb_binlog_setup(THD *thd)
1492 {
1493 if (ndb_binlog_tables_inited)
1494 return true; // Already setup -> OK
1495
1496 /*
1497 Can't proceed unless ndb binlog thread has setup
1498 the schema_ndb pointer(since that pointer is used for
1499 creating the event operations owned by ndb_schema_share)
1500 */
1501 native_mutex_lock(&injector_mutex);
1502 if (!schema_ndb)
1503 {
1504 native_mutex_unlock(&injector_mutex);
1505 return false;
1506 }
1507 native_mutex_unlock(&injector_mutex);
1508
1509 /*
1510 Take the global schema lock to make sure that
1511 the schema is not changed in the cluster while
1512 running setup.
1513 */
1514 Ndb_global_schema_lock_guard global_schema_lock_guard(thd);
1515 if (global_schema_lock_guard.lock(false, false))
1516 return false;
1517
1518 if (!ndb_schema_share &&
1519 ndbcluster_check_ndb_schema_share() == 0)
1520 {
1521 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
1522 if (!ndb_schema_share)
1523 {
1524 ndb_schema_table__create(thd);
1525 // always make sure we create the 'schema' first
1526 if (!ndb_schema_share)
1527 return false;
1528 }
1529 }
1530 if (!ndb_apply_status_share &&
1531 ndbcluster_check_ndb_apply_status_share() == 0)
1532 {
1533 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
1534 if (!ndb_apply_status_share)
1535 {
1536 ndb_apply_table__create(thd);
1537 if (!ndb_apply_status_share)
1538 return false;
1539 }
1540 }
1541
1542 clean_away_stray_files(thd);
1543
1544 if (ndbcluster_find_all_databases(thd))
1545 {
1546 return false;
1547 }
1548
1549 if (ndbcluster_find_all_files(thd))
1550 {
1551 return false;
1552 }
1553
1554 ndb_binlog_tables_inited= TRUE;
1555
1556 if (ndb_binlog_running && ndb_binlog_is_ready)
1557 {
1558 if (opt_ndb_extra_logging)
1559 sql_print_information("NDB Binlog: ndb tables writable");
1560
1561 ndb_tdc_close_cached_tables();
1562
1563 /*
1564 Signal any waiting thread that ndb table setup is
1565 now complete
1566 */
1567 ndb_notify_tables_writable();
1568 }
1569
1570 /* Signal injector thread that all is setup */
1571 native_cond_signal(&injector_cond);
1572
1573 return true; // Setup completed -> OK
1574 }
1575
1576 /*
1577 Defines and struct for schema table.
1578 Should reflect table definition above.
1579 */
1580 #define SCHEMA_DB_I 0u
1581 #define SCHEMA_NAME_I 1u
1582 #define SCHEMA_SLOCK_I 2u
1583 #define SCHEMA_QUERY_I 3u
1584 #define SCHEMA_NODE_ID_I 4u
1585 #define SCHEMA_EPOCH_I 5u
1586 #define SCHEMA_ID_I 6u
1587 #define SCHEMA_VERSION_I 7u
1588 #define SCHEMA_TYPE_I 8u
1589 #define SCHEMA_SIZE 9u
1590 #define SCHEMA_SLOCK_SIZE 32u
1591
1592
1593 /*
1594 log query in schema table
1595 */
ndb_report_waiting(const char * key,int the_time,const char * op,const char * obj,const MY_BITMAP * map)1596 static void ndb_report_waiting(const char *key,
1597 int the_time,
1598 const char *op,
1599 const char *obj,
1600 const MY_BITMAP * map)
1601 {
1602 ulonglong ndb_latest_epoch= 0;
1603 const char *proc_info= "<no info>";
1604 native_mutex_lock(&injector_mutex);
1605 if (injector_ndb)
1606 ndb_latest_epoch= injector_ndb->getLatestGCI();
1607 if (injector_thd)
1608 proc_info= injector_thd->proc_info;
1609 native_mutex_unlock(&injector_mutex);
1610 if (map == 0)
1611 {
1612 sql_print_information("NDB %s:"
1613 " waiting max %u sec for %s %s."
1614 " epochs: (%u/%u,%u/%u,%u/%u)"
1615 " injector proc_info: %s"
1616 ,key, the_time, op, obj
1617 ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1618 ,(uint)(ndb_latest_handled_binlog_epoch)
1619 ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1620 ,(uint)(ndb_latest_received_binlog_epoch)
1621 ,(uint)(ndb_latest_epoch >> 32)
1622 ,(uint)(ndb_latest_epoch)
1623 ,proc_info
1624 );
1625 }
1626 else
1627 {
1628 sql_print_information("NDB %s:"
1629 " waiting max %u sec for %s %s."
1630 " epochs: (%u/%u,%u/%u,%u/%u)"
1631 " injector proc_info: %s map: %x%x"
1632 ,key, the_time, op, obj
1633 ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1634 ,(uint)(ndb_latest_handled_binlog_epoch)
1635 ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1636 ,(uint)(ndb_latest_received_binlog_epoch)
1637 ,(uint)(ndb_latest_epoch >> 32)
1638 ,(uint)(ndb_latest_epoch)
1639 ,proc_info
1640 ,map->bitmap[0]
1641 ,map->bitmap[1]
1642 );
1643 }
1644 }
1645
1646
1647 extern void update_slave_api_stats(Ndb*);
1648
ndbcluster_log_schema_op(THD * thd,const char * query,int query_length,const char * db,const char * table_name,uint32 ndb_table_id,uint32 ndb_table_version,enum SCHEMA_OP_TYPE type,const char * new_db,const char * new_table_name,bool log_query_on_participant)1649 int ndbcluster_log_schema_op(THD *thd,
1650 const char *query, int query_length,
1651 const char *db, const char *table_name,
1652 uint32 ndb_table_id,
1653 uint32 ndb_table_version,
1654 enum SCHEMA_OP_TYPE type,
1655 const char *new_db, const char *new_table_name,
1656 bool log_query_on_participant)
1657 {
1658 DBUG_ENTER("ndbcluster_log_schema_op");
1659 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1660 if (!thd_ndb)
1661 {
1662 if (!(thd_ndb= Thd_ndb::seize(thd)))
1663 {
1664 sql_print_error("Could not allocate Thd_ndb object");
1665 DBUG_RETURN(1);
1666 }
1667 thd_set_thd_ndb(thd, thd_ndb);
1668 }
1669
1670 DBUG_PRINT("enter",
1671 ("query: %s db: %s table_name: %s thd_ndb->options: %d",
1672 query, db, table_name, thd_ndb->options));
1673 if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1674 {
1675 if (thd->slave_thread)
1676 update_slave_api_stats(thd_ndb->ndb);
1677
1678 DBUG_RETURN(0);
1679 }
1680
1681 /* Check that the database name will fit within limits */
1682 if(strlen(db) > NDB_MAX_DDL_NAME_BYTESIZE)
1683 {
1684 // Catch unexpected commands with too long db length
1685 assert(type == SOT_CREATE_DB ||
1686 type == SOT_ALTER_DB ||
1687 type == SOT_DROP_DB);
1688 push_warning_printf(thd, Sql_condition::SL_WARNING,
1689 ER_TOO_LONG_IDENT,
1690 "Ndb has an internal limit of %u bytes on the size of schema identifiers",
1691 NDB_MAX_DDL_NAME_BYTESIZE);
1692 DBUG_RETURN(ER_TOO_LONG_IDENT);
1693 }
1694
1695 char tmp_buf2[FN_REFLEN];
1696 char quoted_table1[2 + 2 * FN_REFLEN + 1];
1697 char quoted_db1[2 + 2 * FN_REFLEN + 1];
1698 char quoted_db2[2 + 2 * FN_REFLEN + 1];
1699 char quoted_table2[2 + 2 * FN_REFLEN + 1];
1700 size_t id_length= 0;
1701 const char *type_str;
1702 uint32 log_type= (uint32)type;
1703 switch (type)
1704 {
1705 case SOT_DROP_TABLE:
1706 /* drop database command, do not log at drop table */
1707 if (thd->lex->sql_command == SQLCOM_DROP_DB)
1708 DBUG_RETURN(0);
1709 /*
1710 Rewrite the drop table query as it may contain several tables
1711 but drop_table() is called once for each table in the query
1712 ie. DROP TABLE t1, t2;
1713 -> DROP TABLE t1 + DROP TABLE t2
1714 */
1715
1716 query= tmp_buf2;
1717 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1718 table_name, 0);
1719 quoted_table1[id_length]= '\0';
1720 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1721 db, 0);
1722 quoted_db1[id_length]= '\0';
1723 query_length= (uint) (strxmov(tmp_buf2, "drop table ", quoted_db1, ".",
1724 quoted_table1, NullS) - tmp_buf2);
1725 type_str= "drop table";
1726 break;
1727 case SOT_RENAME_TABLE_PREPARE:
1728 type_str= "rename table prepare";
1729 break;
1730 case SOT_RENAME_TABLE:
1731 /*
1732 Rewrite the rename table query as it may contain several tables
1733 but rename_table() is called once for each table in the query
1734 ie. RENAME TABLE t1 to tx, t2 to ty;
1735 -> RENAME TABLE t1 to tx + RENAME TABLE t2 to ty
1736 */
1737 query= tmp_buf2;
1738 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1739 db, 0);
1740 quoted_db1[id_length]= '\0';
1741 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1742 table_name, 0);
1743 quoted_table1[id_length]= '\0';
1744 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
1745 new_db, 0);
1746 quoted_db2[id_length]= '\0';
1747 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
1748 new_table_name, 0);
1749 quoted_table2[id_length]= '\0';
1750 query_length= (uint) (strxmov(tmp_buf2, "rename table ",
1751 quoted_db1, ".", quoted_table1, " to ",
1752 quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
1753 type_str= "rename table";
1754 break;
1755 case SOT_CREATE_TABLE:
1756 type_str= "create table";
1757 break;
1758 case SOT_ALTER_TABLE_COMMIT:
1759 type_str= "alter table";
1760 break;
1761 case SOT_ONLINE_ALTER_TABLE_PREPARE:
1762 type_str= "online alter table prepare";
1763 break;
1764 case SOT_ONLINE_ALTER_TABLE_COMMIT:
1765 type_str= "online alter table commit";
1766 break;
1767 case SOT_DROP_DB:
1768 type_str= "drop db";
1769 break;
1770 case SOT_CREATE_DB:
1771 type_str= "create db";
1772 break;
1773 case SOT_ALTER_DB:
1774 type_str= "alter db";
1775 break;
1776 case SOT_TABLESPACE:
1777 type_str= "tablespace";
1778 break;
1779 case SOT_LOGFILE_GROUP:
1780 type_str= "logfile group";
1781 break;
1782 case SOT_TRUNCATE_TABLE:
1783 type_str= "truncate table";
1784 break;
1785 case SOT_CREATE_USER:
1786 type_str= "create user";
1787 break;
1788 case SOT_DROP_USER:
1789 type_str= "drop user";
1790 break;
1791 case SOT_RENAME_USER:
1792 type_str= "rename user";
1793 break;
1794 case SOT_GRANT:
1795 type_str= "grant/revoke";
1796 break;
1797 case SOT_REVOKE:
1798 type_str= "revoke all";
1799 break;
1800 default:
1801 abort(); /* should not happen, programming error */
1802 }
1803
1804 NDB_SCHEMA_OBJECT *ndb_schema_object;
1805 {
1806 char key[FN_REFLEN + 1];
1807 build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1808 ndb_schema_object= ndb_get_schema_object(key, true);
1809 ndb_schema_object->table_id= ndb_table_id;
1810 ndb_schema_object->table_version= ndb_table_version;
1811 }
1812
1813 const NdbError *ndb_error= 0;
1814 // Use nodeid of the primary cluster connection since that is
1815 // the nodeid which the coordinator and participants listen to
1816 const uint32 node_id= g_ndb_cluster_connection->node_id();
1817 Uint64 epoch= 0;
1818 {
1819 /* begin protect ndb_schema_share */
1820 native_mutex_lock(&ndb_schema_share_mutex);
1821 if (ndb_schema_share == 0)
1822 {
1823 native_mutex_unlock(&ndb_schema_share_mutex);
1824 ndb_free_schema_object(&ndb_schema_object);
1825 DBUG_RETURN(0);
1826 }
1827 native_mutex_unlock(&ndb_schema_share_mutex);
1828 }
1829
1830 Ndb *ndb= thd_ndb->ndb;
1831 char save_db[FN_REFLEN];
1832 strcpy(save_db, ndb->getDatabaseName());
1833
1834 char tmp_buf[FN_REFLEN];
1835 NDBDICT *dict= ndb->getDictionary();
1836 ndb->setDatabaseName(NDB_REP_DB);
1837 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1838 const NDBTAB *ndbtab= ndbtab_g.get_table();
1839 NdbTransaction *trans= 0;
1840 int retries= 100;
1841 int retry_sleep= 30; /* 30 milliseconds, transaction */
1842 const NDBCOL *col[SCHEMA_SIZE];
1843 unsigned sz[SCHEMA_SIZE];
1844
1845 if (ndbtab == 0)
1846 {
1847 if (strcmp(NDB_REP_DB, db) != 0 ||
1848 strcmp(NDB_SCHEMA_TABLE, table_name))
1849 {
1850 ndb_error= &dict->getNdbError();
1851 }
1852 goto end;
1853 }
1854
1855 {
1856 uint i;
1857 for (i= 0; i < SCHEMA_SIZE; i++)
1858 {
1859 col[i]= ndbtab->getColumn(i);
1860 if (i != SCHEMA_QUERY_I)
1861 {
1862 sz[i]= col[i]->getLength();
1863 assert(sz[i] <= sizeof(tmp_buf));
1864 }
1865 }
1866 }
1867
1868 while (1)
1869 {
1870 const char *log_db= db;
1871 const char *log_tab= table_name;
1872 const char *log_subscribers= (char*)ndb_schema_object->slock;
1873 if ((trans= ndb->startTransaction()) == 0)
1874 goto err;
1875 while (1)
1876 {
1877 NdbOperation *op= 0;
1878 int r= 0;
1879 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1880 assert(r == 0);
1881 r|= op->writeTuple();
1882 assert(r == 0);
1883
1884 /* db */
1885 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, (int)strlen(log_db));
1886 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1887 assert(r == 0);
1888 /* name */
1889 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
1890 (int)strlen(log_tab));
1891 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1892 assert(r == 0);
1893 /* slock */
1894 assert(sz[SCHEMA_SLOCK_I] ==
1895 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1896 r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
1897 assert(r == 0);
1898 /* query */
1899 {
1900 NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
1901 assert(ndb_blob != 0);
1902 uint blob_len= query_length;
1903 const char* blob_ptr= query;
1904 r|= ndb_blob->setValue(blob_ptr, blob_len);
1905 assert(r == 0);
1906 }
1907 /* node_id */
1908 r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1909 assert(r == 0);
1910 /* epoch */
1911 r|= op->setValue(SCHEMA_EPOCH_I, epoch);
1912 assert(r == 0);
1913 /* id */
1914 r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
1915 assert(r == 0);
1916 /* version */
1917 r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
1918 assert(r == 0);
1919 /* type */
1920 r|= op->setValue(SCHEMA_TYPE_I, log_type);
1921 assert(r == 0);
1922 /* any value */
1923 Uint32 anyValue = 0;
1924 if (! thd->slave_thread)
1925 {
1926 /* Schema change originating from this MySQLD, check SQL_LOG_BIN
1927 * variable and pass 'setting' to all logging MySQLDs via AnyValue
1928 */
1929 if (thd_options(thd) & OPTION_BIN_LOG) /* e.g. SQL_LOG_BIN == on */
1930 {
1931 DBUG_PRINT("info", ("Schema event for binlogging"));
1932 ndbcluster_anyvalue_set_normal(anyValue);
1933 }
1934 else
1935 {
1936 DBUG_PRINT("info", ("Schema event not for binlogging"));
1937 ndbcluster_anyvalue_set_nologging(anyValue);
1938 }
1939
1940 if(!log_query_on_participant)
1941 {
1942 DBUG_PRINT("info", ("Forcing query not to be binlogged on participant"));
1943 ndbcluster_anyvalue_set_nologging(anyValue);
1944 }
1945 }
1946 else
1947 {
1948 /*
1949 Slave propagating replicated schema event in ndb_schema
1950 In case replicated serverId is composite
1951 (server-id-bits < 31) we copy it into the
1952 AnyValue as-is
1953 This is for 'future', as currently Schema operations
1954 do not have composite AnyValues.
1955 In future it may be useful to support *not* mapping composite
1956 AnyValues to/from Binlogged server-ids.
1957 */
1958 DBUG_PRINT("info", ("Replicated schema event with original server id %d",
1959 thd->server_id));
1960 anyValue = thd_unmasked_server_id(thd);
1961 }
1962
1963 #ifndef NDEBUG
1964 /*
1965 MySQLD will set the user-portion of AnyValue (if any) to all 1s
1966 This tests code filtering ServerIds on the value of server-id-bits.
1967 */
1968 const char* p = getenv("NDB_TEST_ANYVALUE_USERDATA");
1969 if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N')
1970 {
1971 dbug_ndbcluster_anyvalue_set_userbits(anyValue);
1972 }
1973 #endif
1974 r|= op->setAnyValue(anyValue);
1975 assert(r == 0);
1976 break;
1977 }
1978 if (trans->execute(NdbTransaction::Commit, NdbOperation::DefaultAbortOption,
1979 1 /* force send */) == 0)
1980 {
1981 DBUG_PRINT("info", ("logged: %s", query));
1982 dict->forceGCPWait(1);
1983 break;
1984 }
1985 err:
1986 const NdbError *this_error= trans ?
1987 &trans->getNdbError() : &ndb->getNdbError();
1988 if (this_error->status == NdbError::TemporaryError && !thd->killed)
1989 {
1990 if (retries--)
1991 {
1992 if (trans)
1993 ndb->closeTransaction(trans);
1994 do_retry_sleep(retry_sleep);
1995 continue; // retry
1996 }
1997 }
1998 ndb_error= this_error;
1999 break;
2000 }
2001 end:
2002 if (ndb_error)
2003 push_warning_printf(thd, Sql_condition::SL_WARNING,
2004 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2005 ndb_error->code,
2006 ndb_error->message,
2007 "Could not log query '%s' on other mysqld's");
2008
2009 if (trans)
2010 ndb->closeTransaction(trans);
2011 ndb->setDatabaseName(save_db);
2012
2013 if (opt_ndb_extra_logging > 19)
2014 {
2015 sql_print_information("NDB: distributed %s.%s(%u/%u) type: %s(%u) query: \'%s\' to %x%x",
2016 db,
2017 table_name,
2018 ndb_table_id,
2019 ndb_table_version,
2020 get_schema_type_name(log_type),
2021 log_type,
2022 query,
2023 ndb_schema_object->slock_bitmap.bitmap[0],
2024 ndb_schema_object->slock_bitmap.bitmap[1]);
2025 }
2026
2027 /*
2028 Wait for other mysqld's to acknowledge the table operation
2029 */
2030 if (ndb_error == 0 && !bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2031 {
2032 int max_timeout= DEFAULT_SYNC_TIMEOUT;
2033 native_mutex_lock(&ndb_schema_object->mutex);
2034 while (1)
2035 {
2036 struct timespec abstime;
2037 set_timespec(&abstime, 1);
2038
2039 // Unlock the schema object and wait for injector to signal that
2040 // something has happened. (NOTE! convoluted in order to
2041 // only use injector_cond with injector_mutex)
2042 native_mutex_unlock(&ndb_schema_object->mutex);
2043 native_mutex_lock(&injector_mutex);
2044 int ret= native_cond_timedwait(&injector_cond,
2045 &injector_mutex,
2046 &abstime);
2047 native_mutex_unlock(&injector_mutex);
2048 native_mutex_lock(&ndb_schema_object->mutex);
2049
2050 if (thd->killed)
2051 break;
2052
2053 /* begin protect ndb_schema_share */
2054 native_mutex_lock(&ndb_schema_share_mutex);
2055 if (ndb_schema_share == 0)
2056 {
2057 native_mutex_unlock(&ndb_schema_share_mutex);
2058 break;
2059 }
2060 native_mutex_unlock(&ndb_schema_share_mutex);
2061 /* end protect ndb_schema_share */
2062
2063 if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2064 break;
2065
2066 if (ret)
2067 {
2068 max_timeout--;
2069 if (max_timeout == 0)
2070 {
2071 sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
2072 type_str, ndb_schema_object->key);
2073 assert(false);
2074 break;
2075 }
2076 if (opt_ndb_extra_logging)
2077 ndb_report_waiting(type_str, max_timeout,
2078 "distributing", ndb_schema_object->key,
2079 &ndb_schema_object->slock_bitmap);
2080 }
2081 }
2082 native_mutex_unlock(&ndb_schema_object->mutex);
2083 }
2084 else if (ndb_error)
2085 {
2086 sql_print_error("NDB %s: distributing %s err: %u",
2087 type_str, ndb_schema_object->key,
2088 ndb_error->code);
2089 }
2090 else if (opt_ndb_extra_logging > 19)
2091 {
2092 sql_print_information("NDB %s: not waiting for distributing %s",
2093 type_str, ndb_schema_object->key);
2094 }
2095
2096 ndb_free_schema_object(&ndb_schema_object);
2097
2098 if (opt_ndb_extra_logging > 19)
2099 {
2100 sql_print_information("NDB: distribution of %s.%s(%u/%u) type: %s(%u) query: \'%s\'"
2101 " - complete!",
2102 db,
2103 table_name,
2104 ndb_table_id,
2105 ndb_table_version,
2106 get_schema_type_name(log_type),
2107 log_type,
2108 query);
2109 }
2110
2111 if (thd->slave_thread)
2112 update_slave_api_stats(ndb);
2113
2114 DBUG_RETURN(0);
2115 }
2116
2117
2118 static
2119 int
ndb_handle_schema_change(THD * thd,Ndb * is_ndb,NdbEventOperation * pOp,const Ndb_event_data * event_data)2120 ndb_handle_schema_change(THD *thd, Ndb *is_ndb, NdbEventOperation *pOp,
2121 const Ndb_event_data *event_data)
2122 {
2123 DBUG_ENTER("ndb_handle_schema_change");
2124 DBUG_PRINT("enter", ("pOp: %p", pOp));
2125
2126 // Only called for TE_DROP and TE_CLUSTER_FAILURE event
2127 assert(pOp->getEventType() == NDBEVENT::TE_DROP ||
2128 pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE);
2129
2130 assert(event_data);
2131
2132 NDB_SHARE *share= event_data->share;
2133 dbug_print_share("changed share: ", share);
2134
2135 TABLE *shadow_table= event_data->shadow_table;
2136 const char *tabname= shadow_table->s->table_name.str;
2137 const char *dbname= shadow_table->s->db.str;
2138 {
2139 Thd_ndb *thd_ndb= get_thd_ndb(thd);
2140 Ndb *ndb= thd_ndb->ndb;
2141 NDBDICT *dict= ndb->getDictionary();
2142 ndb->setDatabaseName(dbname);
2143 Ndb_table_guard ndbtab_g(dict, tabname);
2144 const NDBTAB *ev_tab= pOp->getTable();
2145 const NDBTAB *cache_tab= ndbtab_g.get_table();
2146 if (cache_tab &&
2147 cache_tab->getObjectId() == ev_tab->getObjectId() &&
2148 cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
2149 ndbtab_g.invalidate();
2150 }
2151
2152 native_mutex_lock(&share->mutex);
2153 assert(share->state == NSS_DROPPED ||
2154 share->op == pOp || share->new_op == pOp);
2155 if (share->new_op)
2156 {
2157 share->new_op= 0;
2158 }
2159 if (share->op)
2160 {
2161 share->op= 0;
2162 }
2163 native_mutex_unlock(&share->mutex);
2164
2165 /* Signal ha_ndbcluster::delete/rename_table that drop is done */
2166 DBUG_PRINT("info", ("signal that drop is done"));
2167 (void) native_cond_signal(&injector_cond);
2168
2169 native_mutex_lock(&ndbcluster_mutex);
2170 /* ndb_share reference binlog free */
2171 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
2172 share->key_string(), share->use_count));
2173 free_share(&share, TRUE);
2174
2175 bool do_close_cached_tables= FALSE;
2176 bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
2177 if (is_remote_change && share && share->state != NSS_DROPPED)
2178 {
2179 DBUG_PRINT("info", ("remote change"));
2180 ndbcluster_mark_share_dropped(share);
2181 if (share->use_count != 1)
2182 {
2183 /* open handler holding reference */
2184 /* wait with freeing create ndb_share to below */
2185 do_close_cached_tables= TRUE;
2186 }
2187 else
2188 {
2189 /* ndb_share reference create free */
2190 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
2191 share->key_string(), share->use_count));
2192 free_share(&share, TRUE);
2193 share= 0;
2194 }
2195 }
2196 else
2197 share= 0;
2198 native_mutex_unlock(&ndbcluster_mutex);
2199
2200 DBUG_PRINT("info", ("Deleting event_data"));
2201 delete event_data;
2202 pOp->setCustomData(NULL);
2203
2204 DBUG_PRINT("info", ("Dropping event operation: %p", pOp));
2205 native_mutex_lock(&injector_mutex);
2206 is_ndb->dropEventOperation(pOp);
2207 native_mutex_unlock(&injector_mutex);
2208
2209 if (do_close_cached_tables)
2210 {
2211 ndb_tdc_close_cached_table(thd, dbname, tabname);
2212 /* ndb_share reference create free */
2213 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
2214 share->key_string(), share->use_count));
2215 free_share(&share);
2216 }
2217 DBUG_RETURN(0);
2218 }
2219
2220
2221 class Mutex_guard
2222 {
2223 public:
Mutex_guard(native_mutex_t & mutex)2224 Mutex_guard(native_mutex_t &mutex) : m_mutex(mutex)
2225 {
2226 native_mutex_lock(&m_mutex);
2227 };
~Mutex_guard()2228 ~Mutex_guard()
2229 {
2230 native_mutex_unlock(&m_mutex);
2231 };
2232 private:
2233 native_mutex_t &m_mutex;
2234 };
2235
2236
2237 /*
2238 Data used by the Ndb_schema_event_handler which lives
2239 as long as the NDB Binlog thread is connected to the cluster.
2240
2241 NOTE! An Ndb_schema_event_handler instance only lives for one epoch
2242
2243 */
2244 class Ndb_schema_dist_data {
2245 static const uint max_ndb_nodes= 256; /* multiple of 32 */
2246 uchar m_data_node_id_list[max_ndb_nodes];
2247 /*
2248 The subscribers to ndb_schema are tracked separately for each
2249 data node. This avoids the need to know which data nodes are
2250 connected.
2251 An api counts as subscribed as soon as one of the data nodes
2252 report it as subscibed.
2253 */
2254 MY_BITMAP *subscriber_bitmap;
2255 unsigned m_num_bitmaps;
2256
2257 // Holds the new key for a table to be renamed
2258 struct NDB_SHARE_KEY* m_prepared_rename_key;
2259 public:
2260 Ndb_schema_dist_data(const Ndb_schema_dist_data&); // Not implemented
Ndb_schema_dist_data()2261 Ndb_schema_dist_data() :
2262 subscriber_bitmap(NULL),
2263 m_num_bitmaps(0),
2264 m_prepared_rename_key(NULL)
2265 {}
2266
init(Ndb_cluster_connection * cluster_connection)2267 void init(Ndb_cluster_connection* cluster_connection)
2268 {
2269 // Initialize "g_node_id_map" which maps from nodeid to index in
2270 // subscriber bitmaps array. The mapping array is only used when
2271 // the NDB binlog thread handles events on the mysql.ndb_schema table
2272 uint node_id, i= 0;
2273 Ndb_cluster_connection_node_iter node_iter;
2274 memset((void *)m_data_node_id_list, 0xFFFF, sizeof(m_data_node_id_list));
2275 while ((node_id= cluster_connection->get_next_node(node_iter)))
2276 m_data_node_id_list[node_id]= i++;
2277
2278 {
2279 // Create array of bitmaps for keeping track of subscribed nodes
2280 unsigned no_nodes= cluster_connection->no_db_nodes();
2281 subscriber_bitmap= (MY_BITMAP*)my_malloc(PSI_INSTRUMENT_ME,
2282 no_nodes * sizeof(MY_BITMAP),
2283 MYF(MY_WME));
2284 for (unsigned i= 0; i < no_nodes; i++)
2285 {
2286 bitmap_init(&subscriber_bitmap[i],
2287 (Uint32*)my_malloc(PSI_INSTRUMENT_ME,
2288 max_ndb_nodes/8, MYF(MY_WME)),
2289 max_ndb_nodes, FALSE);
2290 bitmap_clear_all(&subscriber_bitmap[i]);
2291 }
2292 // Remember the number of bitmaps allocated
2293 m_num_bitmaps = no_nodes;
2294 }
2295 }
2296
release(void)2297 void release(void)
2298 {
2299 if (!m_num_bitmaps)
2300 {
2301 // Allow release without init(), happens when binlog thread
2302 // is terminated before connection to cluster has been made
2303 // NOTE! Should be possible to use static memory for the arrays
2304 return;
2305 }
2306
2307 for (unsigned i= 0; i < m_num_bitmaps; i++)
2308 {
2309 // Free memory allocated for the bitmap
2310 // allocated by my_malloc() and passed as "buf" to bitmap_init()
2311 bitmap_free(&subscriber_bitmap[i]);
2312 }
2313 // Free memory allocated for the bitmap array
2314 my_free(subscriber_bitmap);
2315 m_num_bitmaps = 0;
2316
2317 // Release the prepared rename key, it's very unlikely
2318 // that the key is still around here, but just in case
2319 NDB_SHARE::free_key(m_prepared_rename_key);
2320 }
2321
2322 // Map from nodeid to position in subscriber bitmaps array
map2subscriber_bitmap_index(uint data_node_id) const2323 uint8 map2subscriber_bitmap_index(uint data_node_id) const
2324 {
2325 assert(data_node_id <
2326 (sizeof(m_data_node_id_list)/sizeof(m_data_node_id_list[0])));
2327 const uint8 bitmap_index = m_data_node_id_list[data_node_id];
2328 assert(bitmap_index != 0xFF);
2329 assert(bitmap_index < m_num_bitmaps);
2330 return bitmap_index;
2331 }
2332
report_data_node_failure(unsigned data_node_id)2333 void report_data_node_failure(unsigned data_node_id)
2334 {
2335 uint8 idx= map2subscriber_bitmap_index(data_node_id);
2336 bitmap_clear_all(&subscriber_bitmap[idx]);
2337 DBUG_PRINT("info",("Data node %u failure", data_node_id));
2338 if (opt_ndb_extra_logging)
2339 {
2340 sql_print_information("NDB Schema dist: Data node: %d failed,"
2341 " subscriber bitmask %x%x",
2342 data_node_id,
2343 subscriber_bitmap[idx].bitmap[1],
2344 subscriber_bitmap[idx].bitmap[0]);
2345 }
2346 }
2347
report_subscribe(unsigned data_node_id,unsigned subscriber_node_id)2348 void report_subscribe(unsigned data_node_id, unsigned subscriber_node_id)
2349 {
2350 uint8 idx= map2subscriber_bitmap_index(data_node_id);
2351 assert(subscriber_node_id != 0);
2352 bitmap_set_bit(&subscriber_bitmap[idx], subscriber_node_id);
2353 DBUG_PRINT("info",("Data node %u reported node %u subscribed ",
2354 data_node_id, subscriber_node_id));
2355 if (opt_ndb_extra_logging)
2356 {
2357 sql_print_information("NDB Schema dist: Data node: %d reports "
2358 "subscribe from node %d, subscriber bitmask %x%x",
2359 data_node_id,
2360 subscriber_node_id,
2361 subscriber_bitmap[idx].bitmap[1],
2362 subscriber_bitmap[idx].bitmap[0]);
2363 }
2364 }
2365
report_unsubscribe(unsigned data_node_id,unsigned subscriber_node_id)2366 void report_unsubscribe(unsigned data_node_id, unsigned subscriber_node_id)
2367 {
2368 uint8 idx= map2subscriber_bitmap_index(data_node_id);
2369 assert(subscriber_node_id != 0);
2370 bitmap_clear_bit(&subscriber_bitmap[idx], subscriber_node_id);
2371 DBUG_PRINT("info",("Data node %u reported node %u unsubscribed ",
2372 data_node_id, subscriber_node_id));
2373 if (opt_ndb_extra_logging)
2374 {
2375 sql_print_information("NDB Schema dist: Data node: %d reports "
2376 "subscribe from node %d, subscriber bitmask %x%x",
2377 data_node_id,
2378 subscriber_node_id,
2379 subscriber_bitmap[idx].bitmap[1],
2380 subscriber_bitmap[idx].bitmap[0]);
2381 }
2382 }
2383
get_subscriber_bitmask(MY_BITMAP * servers)2384 void get_subscriber_bitmask(MY_BITMAP* servers)
2385 {
2386 for (unsigned i= 0; i < m_num_bitmaps; i++)
2387 {
2388 bitmap_union(servers, &subscriber_bitmap[i]);
2389 }
2390 }
2391
2392
save_prepared_rename_key(NDB_SHARE_KEY * key)2393 void save_prepared_rename_key(NDB_SHARE_KEY* key)
2394 {
2395 m_prepared_rename_key = key;
2396 }
2397
get_prepared_rename_key() const2398 NDB_SHARE_KEY* get_prepared_rename_key() const {
2399 return m_prepared_rename_key;
2400 }
2401
2402 };
2403
2404 #include "ndb_local_schema.h"
2405
2406 class Ndb_schema_event_handler {
2407
2408 class Ndb_schema_op
2409 {
2410 // Unpack Ndb_schema_op from event_data pointer
unpack_event(const Ndb_event_data * event_data)2411 void unpack_event(const Ndb_event_data *event_data)
2412 {
2413 TABLE *table= event_data->shadow_table;
2414 Field **field;
2415 /* unpack blob values */
2416 uchar* blobs_buffer= 0;
2417 uint blobs_buffer_size= 0;
2418 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
2419 {
2420 ptrdiff_t ptrdiff= 0;
2421 int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
2422 blobs_buffer, blobs_buffer_size,
2423 ptrdiff);
2424 if (ret != 0)
2425 {
2426 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
2427 DBUG_PRINT("info", ("blob read error"));
2428 assert(FALSE);
2429 }
2430 }
2431 /* db varchar 1 length uchar */
2432 field= table->field;
2433 db_length= *(uint8*)(*field)->ptr;
2434 assert(db_length <= (*field)->field_length);
2435 assert((*field)->field_length + 1 == sizeof(db));
2436 memcpy(db, (*field)->ptr + 1, db_length);
2437 db[db_length]= 0;
2438 /* name varchar 1 length uchar */
2439 field++;
2440 name_length= *(uint8*)(*field)->ptr;
2441 assert(name_length <= (*field)->field_length);
2442 assert((*field)->field_length + 1 == sizeof(name));
2443 memcpy(name, (*field)->ptr + 1, name_length);
2444 name[name_length]= 0;
2445 /* slock fixed length */
2446 field++;
2447 slock_length= (*field)->field_length;
2448 assert((*field)->field_length == sizeof(slock_buf));
2449 memcpy(slock_buf, (*field)->ptr, slock_length);
2450 /* query blob */
2451 field++;
2452 {
2453 Field_blob *field_blob= (Field_blob*)(*field);
2454 uint blob_len= field_blob->get_length((*field)->ptr);
2455 uchar *blob_ptr= 0;
2456 field_blob->get_ptr(&blob_ptr);
2457 assert(blob_len == 0 || blob_ptr != 0);
2458 query_length= blob_len;
2459 query= sql_strmake((char*) blob_ptr, blob_len);
2460 }
2461 /* node_id */
2462 field++;
2463 node_id= (Uint32)((Field_long *)*field)->val_int();
2464 /* epoch */
2465 field++;
2466 epoch= ((Field_long *)*field)->val_int();
2467 /* id */
2468 field++;
2469 id= (Uint32)((Field_long *)*field)->val_int();
2470 /* version */
2471 field++;
2472 version= (Uint32)((Field_long *)*field)->val_int();
2473 /* type */
2474 field++;
2475 type= (Uint32)((Field_long *)*field)->val_int();
2476 /* free blobs buffer */
2477 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
2478 dbug_tmp_restore_column_map(table->read_set, old_map);
2479 }
2480
2481 public:
2482 uchar db_length;
2483 char db[64];
2484 uchar name_length;
2485 char name[64];
2486 uchar slock_length;
2487 uint32 slock_buf[SCHEMA_SLOCK_SIZE/4];
2488 MY_BITMAP slock;
2489 unsigned short query_length;
2490 char *query;
2491 Uint64 epoch;
2492 uint32 node_id;
2493 uint32 id;
2494 uint32 version;
2495 uint32 type;
2496 uint32 any_value;
2497
2498 /**
2499 Create a Ndb_schema_op from event_data
2500 */
2501 static Ndb_schema_op*
create(const Ndb_event_data * event_data,Uint32 any_value)2502 create(const Ndb_event_data* event_data,
2503 Uint32 any_value)
2504 {
2505 DBUG_ENTER("Ndb_schema_op::create");
2506 Ndb_schema_op* schema_op=
2507 (Ndb_schema_op*)sql_alloc(sizeof(Ndb_schema_op));
2508 bitmap_init(&schema_op->slock,
2509 schema_op->slock_buf, 8*SCHEMA_SLOCK_SIZE, FALSE);
2510 schema_op->unpack_event(event_data);
2511 schema_op->any_value= any_value;
2512 DBUG_PRINT("exit", ("%s.%s: query: '%s' type: %d",
2513 schema_op->db, schema_op->name,
2514 schema_op->query,
2515 schema_op->type));
2516 DBUG_RETURN(schema_op);
2517 }
2518 };
2519
2520 static void
print_could_not_discover_error(THD * thd,const Ndb_schema_op * schema)2521 print_could_not_discover_error(THD *thd,
2522 const Ndb_schema_op *schema)
2523 {
2524 sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
2525 "binlog schema event '%s' from node %d. "
2526 "my_errno: %d",
2527 schema->db, schema->name, schema->query,
2528 schema->node_id, my_errno());
2529 thd_print_warning_list(thd, "NDB Binlog");
2530 }
2531
2532
2533 static void
write_schema_op_to_binlog(THD * thd,Ndb_schema_op * schema)2534 write_schema_op_to_binlog(THD *thd, Ndb_schema_op *schema)
2535 {
2536
2537 if (!ndb_binlog_running)
2538 {
2539 // This mysqld is not writing a binlog
2540 return;
2541 }
2542
2543 /* any_value == 0 means local cluster sourced change that
2544 * should be logged
2545 */
2546 if (ndbcluster_anyvalue_is_reserved(schema->any_value))
2547 {
2548 /* Originating SQL node did not want this query logged */
2549 if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
2550 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
2551 "query not logged",
2552 schema->any_value);
2553 return;
2554 }
2555
2556 Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
2557 /*
2558 Start with serverId as received AnyValue, in case it's a composite
2559 (server_id_bits < 31).
2560 This is for 'future', as currently schema ops do not have composite
2561 AnyValues.
2562 In future it may be useful to support *not* mapping composite
2563 AnyValues to/from Binlogged server-ids.
2564 */
2565 Uint32 loggedServerId = schema->any_value;
2566
2567 if (queryServerId)
2568 {
2569 /*
2570 AnyValue has non-zero serverId, must be a query applied by a slave
2571 mysqld.
2572 TODO : Assert that we are running in the Binlog injector thread?
2573 */
2574 if (! g_ndb_log_slave_updates)
2575 {
2576 /* This MySQLD does not log slave updates */
2577 return;
2578 }
2579 }
2580 else
2581 {
2582 /* No ServerId associated with this query, mark it as ours */
2583 ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
2584 }
2585
2586 /*
2587 Write the DDL query to binlog with server_id set
2588 to the server_id where the query originated.
2589 */
2590 const uint32 thd_server_id_save= thd->server_id;
2591 assert(sizeof(thd_server_id_save) == sizeof(thd->server_id));
2592 thd->server_id = loggedServerId;
2593
2594 LEX_CSTRING thd_db_save= thd->db();
2595 LEX_CSTRING schema_db_lex_cstr= {schema->db, strlen(schema->db)};
2596 thd->reset_db(schema_db_lex_cstr);
2597
2598 int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
2599 thd->binlog_query(THD::STMT_QUERY_TYPE,
2600 schema->query, schema->query_length,
2601 false, // is_trans
2602 true, // direct
2603 schema->name[0] == 0 || thd->db().str[0] == 0,
2604 errcode);
2605
2606 // Commit the binlog write
2607 (void)trans_commit_stmt(thd);
2608
2609 /*
2610 Restore original server_id and db after commit
2611 since the server_id is being used also in the commit logic
2612 */
2613 thd->server_id= thd_server_id_save;
2614 thd->reset_db(thd_db_save);
2615 }
2616
2617
2618
2619 /*
2620 Acknowledge handling of schema operation
2621 - Inform the other nodes that schema op has
2622 been completed by this node (by updating the
2623 row for this op in ndb_schema table)
2624 */
2625 int
ack_schema_op(const char * db,const char * table_name,uint32 table_id,uint32 table_version)2626 ack_schema_op(const char *db, const char *table_name,
2627 uint32 table_id, uint32 table_version)
2628 {
2629 DBUG_ENTER("ack_schema_op");
2630
2631 const NdbError *ndb_error= 0;
2632 Ndb *ndb= check_ndb_in_thd(m_thd);
2633 char save_db[FN_HEADLEN];
2634 strcpy(save_db, ndb->getDatabaseName());
2635
2636 char tmp_buf[FN_REFLEN];
2637 NDBDICT *dict= ndb->getDictionary();
2638 ndb->setDatabaseName(NDB_REP_DB);
2639 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
2640 const NDBTAB *ndbtab= ndbtab_g.get_table();
2641 NdbTransaction *trans= 0;
2642 int retries= 100;
2643 int retry_sleep= 30; /* 30 milliseconds, transaction */
2644 const NDBCOL *col[SCHEMA_SIZE];
2645
2646 MY_BITMAP slock;
2647 uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
2648 bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
2649
2650 if (ndbtab == 0)
2651 {
2652 if (dict->getNdbError().code != 4009)
2653 abort();
2654 DBUG_RETURN(0);
2655 }
2656
2657 {
2658 uint i;
2659 for (i= 0; i < SCHEMA_SIZE; i++)
2660 {
2661 col[i]= ndbtab->getColumn(i);
2662 if (i != SCHEMA_QUERY_I)
2663 {
2664 assert(col[i]->getLength() <= (int)sizeof(tmp_buf));
2665 }
2666 }
2667 }
2668
2669 while (1)
2670 {
2671 if ((trans= ndb->startTransaction()) == 0)
2672 goto err;
2673 {
2674 NdbOperation *op= 0;
2675 int r= 0;
2676
2677 /* read the bitmap exlusive */
2678 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
2679 assert(r == 0);
2680 r|= op->readTupleExclusive();
2681 assert(r == 0);
2682
2683 /* db */
2684 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
2685 r|= op->equal(SCHEMA_DB_I, tmp_buf);
2686 assert(r == 0);
2687 /* name */
2688 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
2689 (int)strlen(table_name));
2690 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
2691 assert(r == 0);
2692 /* slock */
2693 r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
2694 assert(r == 0);
2695 }
2696 if (trans->execute(NdbTransaction::NoCommit))
2697 goto err;
2698
2699 if (opt_ndb_extra_logging > 19)
2700 {
2701 uint32 copy[SCHEMA_SLOCK_SIZE/4];
2702 memcpy(copy, bitbuf, sizeof(copy));
2703 bitmap_clear_bit(&slock, own_nodeid());
2704 sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
2705 db, table_name,
2706 table_id, table_version,
2707 copy[0], copy[1],
2708 slock.bitmap[0],
2709 slock.bitmap[1]);
2710 }
2711 else
2712 {
2713 bitmap_clear_bit(&slock, own_nodeid());
2714 }
2715
2716 {
2717 NdbOperation *op= 0;
2718 int r= 0;
2719
2720 /* now update the tuple */
2721 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
2722 assert(r == 0);
2723 r|= op->updateTuple();
2724 assert(r == 0);
2725
2726 /* db */
2727 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
2728 r|= op->equal(SCHEMA_DB_I, tmp_buf);
2729 assert(r == 0);
2730 /* name */
2731 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
2732 (int)strlen(table_name));
2733 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
2734 assert(r == 0);
2735 /* slock */
2736 r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
2737 assert(r == 0);
2738 /* node_id */
2739 r|= op->setValue(SCHEMA_NODE_ID_I, own_nodeid());
2740 assert(r == 0);
2741 /* type */
2742 r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
2743 assert(r == 0);
2744 }
2745 if (trans->execute(NdbTransaction::Commit,
2746 NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
2747 {
2748 DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
2749 own_nodeid(), db, table_name));
2750 dict->forceGCPWait(1);
2751 break;
2752 }
2753 err:
2754 const NdbError *this_error= trans ?
2755 &trans->getNdbError() : &ndb->getNdbError();
2756 if (this_error->status == NdbError::TemporaryError &&
2757 !thd_killed(m_thd))
2758 {
2759 if (retries--)
2760 {
2761 if (trans)
2762 ndb->closeTransaction(trans);
2763 do_retry_sleep(retry_sleep);
2764 continue; // retry
2765 }
2766 }
2767 ndb_error= this_error;
2768 break;
2769 }
2770
2771 if (ndb_error)
2772 {
2773 sql_print_warning("NDB: Could not release slock on '%s.%s', "
2774 "Error code: %d Message: %s",
2775 db, table_name,
2776 ndb_error->code, ndb_error->message);
2777 }
2778 if (trans)
2779 ndb->closeTransaction(trans);
2780 ndb->setDatabaseName(save_db);
2781 DBUG_RETURN(0);
2782 }
2783
2784
check_is_ndb_schema_event(const Ndb_event_data * event_data) const2785 bool check_is_ndb_schema_event(const Ndb_event_data* event_data) const
2786 {
2787 if (!event_data)
2788 {
2789 // Received event without event data pointer
2790 assert(false);
2791 return false;
2792 }
2793
2794 NDB_SHARE *share= event_data->share;
2795 if (!share)
2796 {
2797 // Received event where the event_data is not properly initialized
2798 assert(false);
2799 return false;
2800 }
2801 assert(event_data->shadow_table);
2802 assert(event_data->ndb_value[0]);
2803 assert(event_data->ndb_value[1]);
2804
2805 native_mutex_lock(&ndb_schema_share_mutex);
2806 if (share != ndb_schema_share)
2807 {
2808 // Received event from s_ndb not pointing at the ndb_schema_share
2809 native_mutex_unlock(&ndb_schema_share_mutex);
2810 assert(false);
2811 return false;
2812 }
2813 assert(!strncmp(share->db, STRING_WITH_LEN(NDB_REP_DB)));
2814 assert(!strncmp(share->table_name, STRING_WITH_LEN(NDB_SCHEMA_TABLE)));
2815 native_mutex_unlock(&ndb_schema_share_mutex);
2816 return true;
2817 }
2818
2819
2820 void
handle_after_epoch(Ndb_schema_op * schema)2821 handle_after_epoch(Ndb_schema_op* schema)
2822 {
2823 DBUG_ENTER("handle_after_epoch");
2824 DBUG_PRINT("info", ("Pushing Ndb_schema_op on list to be "
2825 "handled after epoch"));
2826 assert(!is_post_epoch()); // Only before epoch
2827 m_post_epoch_handle_list.push_back(schema, m_mem_root);
2828 DBUG_VOID_RETURN;
2829 }
2830
2831
2832 void
ack_after_epoch(Ndb_schema_op * schema)2833 ack_after_epoch(Ndb_schema_op* schema)
2834 {
2835 DBUG_ENTER("ack_after_epoch");
2836 assert(!is_post_epoch()); // Only before epoch
2837 m_post_epoch_ack_list.push_back(schema, m_mem_root);
2838 DBUG_VOID_RETURN;
2839 }
2840
2841
own_nodeid(void) const2842 uint own_nodeid(void) const
2843 {
2844 return m_own_nodeid;
2845 }
2846
2847
2848 void
ndbapi_invalidate_table(const char * db_name,const char * table_name) const2849 ndbapi_invalidate_table(const char* db_name, const char* table_name) const
2850 {
2851 DBUG_ENTER("ndbapi_invalidate_table");
2852 Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
2853 Ndb *ndb= thd_ndb->ndb;
2854
2855 ndb->setDatabaseName(db_name);
2856 Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
2857 ndbtab_g.invalidate();
2858 DBUG_VOID_RETURN;
2859 }
2860
2861
2862 void
mysqld_close_cached_table(const char * db_name,const char * table_name) const2863 mysqld_close_cached_table(const char* db_name, const char* table_name) const
2864 {
2865 DBUG_ENTER("mysqld_close_cached_table");
2866 // Just mark table as "need reopen"
2867 const bool wait_for_refresh = false;
2868 // Not waiting -> no timeout needed
2869 const ulong timeout = 0;
2870
2871 TABLE_LIST table_list;
2872 memset(&table_list, 0, sizeof(table_list));
2873 table_list.db= (char*)db_name;
2874 table_list.alias= table_list.table_name= (char*)table_name;
2875
2876 close_cached_tables(m_thd, &table_list,
2877 wait_for_refresh, timeout);
2878 DBUG_VOID_RETURN;
2879 }
2880
2881
2882 void
mysqld_write_frm_from_ndb(const char * db_name,const char * table_name) const2883 mysqld_write_frm_from_ndb(const char* db_name,
2884 const char* table_name) const
2885 {
2886 DBUG_ENTER("mysqld_write_frm_from_ndb");
2887 Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
2888 Ndb *ndb= thd_ndb->ndb;
2889 Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
2890 const NDBTAB *ndbtab= ndbtab_g.get_table();
2891 if (!ndbtab)
2892 {
2893 /*
2894 Bug#14773491 reports crash in 'cmp_frm' due to
2895 ndbtab* being NULL -> bail out here
2896 */
2897 sql_print_error("NDB schema: Could not find table '%s.%s' in NDB",
2898 db_name, table_name);
2899 assert(false);
2900 DBUG_VOID_RETURN;
2901 }
2902
2903 char key[FN_REFLEN];
2904 build_table_filename(key, sizeof(key)-1,
2905 db_name, table_name, NullS, 0);
2906
2907 uchar *data= 0, *pack_data= 0;
2908 size_t length, pack_length;
2909
2910 if (readfrm(key, &data, &length) == 0 &&
2911 packfrm(data, length, &pack_data, &pack_length) == 0 &&
2912 cmp_frm(ndbtab, pack_data, pack_length))
2913 {
2914 DBUG_PRINT("info", ("Detected frm change of table %s.%s",
2915 db_name, table_name));
2916
2917 DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
2918 ndbtab->getFrmLength());
2919 my_free(data);
2920 data= NULL;
2921
2922 int error;
2923 if ((error= unpackfrm(&data, &length,
2924 (const uchar*) ndbtab->getFrmData())) ||
2925 (error= writefrm(key, data, length)))
2926 {
2927 sql_print_error("NDB: Failed write frm for %s.%s, error %d",
2928 db_name, table_name, error);
2929 }
2930 }
2931 my_free(data);
2932 my_free(pack_data);
2933 DBUG_VOID_RETURN;
2934 }
2935
2936
get_share(Ndb_schema_op * schema) const2937 NDB_SHARE* get_share(Ndb_schema_op* schema) const
2938 {
2939 DBUG_ENTER("get_share(Ndb_schema_op*)");
2940 char key[FN_REFLEN + 1];
2941 build_table_filename(key, sizeof(key) - 1,
2942 schema->db, schema->name, "", 0);
2943 NDB_SHARE *share= ndbcluster_get_share(key, 0, FALSE, FALSE);
2944 if (share)
2945 {
2946 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2947 share->key_string(), share->use_count));
2948 }
2949 DBUG_RETURN(share);
2950 }
2951
2952
2953 bool
check_if_local_tables_in_db(const char * dbname) const2954 check_if_local_tables_in_db(const char *dbname) const
2955 {
2956 DBUG_ENTER("check_if_local_tables_in_db");
2957 DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
2958 Ndb_find_files_list files(m_thd);
2959 char path[FN_REFLEN + 1];
2960 build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
2961 if (!files.find_tables(dbname, path))
2962 {
2963 m_thd->clear_error();
2964 DBUG_PRINT("info", ("Failed to find files"));
2965 DBUG_RETURN(true);
2966 }
2967 DBUG_PRINT("info",("found: %d files", files.found_files()));
2968
2969 LEX_STRING *tabname;
2970 while ((tabname= files.next()))
2971 {
2972 DBUG_PRINT("info", ("Found table %s", tabname->str));
2973 if (ndbcluster_check_if_local_table(dbname, tabname->str))
2974 DBUG_RETURN(true);
2975 }
2976
2977 DBUG_RETURN(false);
2978 }
2979
2980
is_local_table(const char * db_name,const char * table_name) const2981 bool is_local_table(const char* db_name, const char* table_name) const
2982 {
2983 return ndbcluster_check_if_local_table(db_name, table_name);
2984 }
2985
2986
handle_clear_slock(Ndb_schema_op * schema)2987 void handle_clear_slock(Ndb_schema_op* schema)
2988 {
2989 DBUG_ENTER("handle_clear_slock");
2990
2991 assert(is_post_epoch());
2992
2993 char key[FN_REFLEN + 1];
2994 build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2995
2996 /* Ack to any SQL thread waiting for schema op to complete */
2997 NDB_SCHEMA_OBJECT *ndb_schema_object= ndb_get_schema_object(key, false);
2998 if (!ndb_schema_object)
2999 {
3000 /* Noone waiting for this schema op in this mysqld */
3001 if (opt_ndb_extra_logging > 19)
3002 sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
3003 key, schema->id, schema->version);
3004 DBUG_VOID_RETURN;
3005 }
3006
3007 if (ndb_schema_object->table_id != schema->id ||
3008 ndb_schema_object->table_version != schema->version)
3009 {
3010 /* Someone waiting, but for another id/version... */
3011 if (opt_ndb_extra_logging > 19)
3012 sql_print_information("NDB: Discarding event...key: %s "
3013 "non matching id/version [%u/%u] != [%u/%u]",
3014 key,
3015 ndb_schema_object->table_id,
3016 ndb_schema_object->table_version,
3017 schema->id,
3018 schema->version);
3019 ndb_free_schema_object(&ndb_schema_object);
3020 DBUG_VOID_RETURN;
3021 }
3022
3023 // Build bitmask of subscribers
3024 MY_BITMAP servers;
3025 bitmap_init(&servers, 0, 256, FALSE);
3026 bitmap_clear_all(&servers);
3027 bitmap_set_bit(&servers, own_nodeid()); // "we" are always alive
3028 m_schema_dist_data.get_subscriber_bitmask(&servers);
3029
3030 /*
3031 Copy the latest slock info into the ndb_schema_object so that
3032 waiter can check if all nodes it's waiting for has answered
3033 */
3034 native_mutex_lock(&ndb_schema_object->mutex);
3035 if (opt_ndb_extra_logging > 19)
3036 {
3037 sql_print_information("NDB: CLEAR_SLOCK key: %s(%u/%u) from"
3038 " %x%x to %x%x",
3039 key, schema->id, schema->version,
3040 ndb_schema_object->slock[0],
3041 ndb_schema_object->slock[1],
3042 schema->slock_buf[0],
3043 schema->slock_buf[1]);
3044 }
3045 memcpy(ndb_schema_object->slock, schema->slock_buf,
3046 sizeof(ndb_schema_object->slock));
3047 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
3048 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
3049 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
3050
3051 /* remove any unsubscribed from ndb_schema_object->slock */
3052 bitmap_intersect(&ndb_schema_object->slock_bitmap, &servers);
3053 bitmap_free(&servers);
3054
3055 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
3056 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
3057 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
3058 native_mutex_unlock(&ndb_schema_object->mutex);
3059
3060 ndb_free_schema_object(&ndb_schema_object);
3061
3062 /* Wake up the waiter */
3063 native_cond_signal(&injector_cond);
3064
3065 DBUG_VOID_RETURN;
3066 }
3067
3068
3069 void
handle_offline_alter_table_commit(Ndb_schema_op * schema)3070 handle_offline_alter_table_commit(Ndb_schema_op* schema)
3071 {
3072 DBUG_ENTER("handle_offline_alter_table_commit");
3073
3074 assert(is_post_epoch()); // Always after epoch
3075
3076 if (schema->node_id == own_nodeid())
3077 DBUG_VOID_RETURN;
3078
3079 write_schema_op_to_binlog(m_thd, schema);
3080 ndbapi_invalidate_table(schema->db, schema->name);
3081 mysqld_close_cached_table(schema->db, schema->name);
3082
3083 /**
3084 * Note about get_share() / free_share() referrences:
3085 *
3086 * 1) All shares have a ref count related to their 'discovery' by dictionary.
3087 * (Until they are 'dropped')
3088 * 2) All shares are referred by the binlog thread if its DDL operations
3089 * should be replicated with schema events ('share->op != NULL')
3090 * 3) All shares are ref counted when they are temporarily referred
3091 * inside a function. (as below)
3092 */
3093 NDB_SHARE *share= get_share(schema); // 3) Temporary pin 'share'
3094 if (share)
3095 {
3096 native_mutex_lock(&share->mutex);
3097 if (share->op)
3098 {
3099 Ndb_event_data *event_data=
3100 (Ndb_event_data *) share->op->getCustomData();
3101 if (event_data)
3102 delete event_data;
3103 share->op->setCustomData(NULL);
3104 {
3105 Mutex_guard injector_mutex_g(injector_mutex);
3106 injector_ndb->dropEventOperation(share->op);
3107 }
3108 share->op= 0;
3109 free_share(&share); // Free binlog ref, 2)
3110 assert(share); // Still ref'ed by 1) & 3)
3111 }
3112 native_mutex_unlock(&share->mutex);
3113 free_share(&share); // Free temporary ref, 3)
3114 assert(share); // Still ref'ed by dict, 1)
3115
3116 /**
3117 * Finaly unref. from dictionary, 1).
3118 * If this was the last share ref, it will be deleted.
3119 * If there are more (trailing) references, the share will remain as an
3120 * unvisible instance in the share-hash until remaining references are dropped.
3121 */
3122 native_mutex_lock(&ndbcluster_mutex);
3123 handle_trailing_share(m_thd, share); // Unref my 'share', and make any pending refs 'trailing'
3124 share= 0; // It's gone
3125 native_mutex_unlock(&ndbcluster_mutex);
3126 } // if (share)
3127
3128 if (is_local_table(schema->db, schema->name) &&
3129 !Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
3130 schema->name))
3131 {
3132 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' "
3133 "from binlog schema event '%s' from node %d.",
3134 schema->db, schema->name, schema->query,
3135 schema->node_id);
3136 DBUG_VOID_RETURN;
3137 }
3138
3139 // Instantiate a new 'share' for the altered table.
3140 if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
3141 {
3142 print_could_not_discover_error(m_thd, schema);
3143 }
3144 DBUG_VOID_RETURN;
3145 }
3146
3147
3148 void
handle_online_alter_table_prepare(Ndb_schema_op * schema)3149 handle_online_alter_table_prepare(Ndb_schema_op* schema)
3150 {
3151 assert(is_post_epoch()); // Always after epoch
3152
3153 ndbapi_invalidate_table(schema->db, schema->name);
3154 mysqld_close_cached_table(schema->db, schema->name);
3155
3156 if (schema->node_id != own_nodeid())
3157 {
3158 write_schema_op_to_binlog(m_thd, schema);
3159 if (!is_local_table(schema->db, schema->name))
3160 {
3161 mysqld_write_frm_from_ndb(schema->db, schema->name);
3162 }
3163 }
3164 }
3165
3166
3167 void
handle_online_alter_table_commit(Ndb_schema_op * schema)3168 handle_online_alter_table_commit(Ndb_schema_op* schema)
3169 {
3170 assert(is_post_epoch()); // Always after epoch
3171
3172 NDB_SHARE *share= get_share(schema);
3173 if (share)
3174 {
3175 if (opt_ndb_extra_logging > 9)
3176 sql_print_information("NDB Binlog: handling online alter/rename");
3177
3178 native_mutex_lock(&share->mutex);
3179 ndb_binlog_close_shadow_table(share);
3180
3181 if (ndb_binlog_open_shadow_table(m_thd, share))
3182 {
3183 sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s",
3184 schema->db, schema->name);
3185 native_mutex_unlock(&share->mutex);
3186 }
3187 else
3188 {
3189 /*
3190 Start subscribing to data changes to the new table definition
3191 */
3192 String event_name(INJECTOR_EVENT_LEN);
3193 ndb_rep_event_name(&event_name, schema->db, schema->name,
3194 get_binlog_full(share));
3195 NdbEventOperation *tmp_op= share->op;
3196 share->new_op= 0;
3197 share->op= 0;
3198
3199 Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3200 Ndb *ndb= thd_ndb->ndb;
3201 Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
3202 const NDBTAB *ndbtab= ndbtab_g.get_table();
3203 if (ndbcluster_create_event_ops(m_thd, share, ndbtab,
3204 event_name.c_ptr()))
3205 {
3206 sql_print_error("NDB Binlog:"
3207 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
3208 event_name.c_ptr());
3209 }
3210 else
3211 {
3212 share->new_op= share->op;
3213 }
3214 share->op= tmp_op;
3215 native_mutex_unlock(&share->mutex);
3216
3217 if (opt_ndb_extra_logging > 9)
3218 sql_print_information("NDB Binlog: handling online "
3219 "alter/rename done");
3220 }
3221 native_mutex_lock(&share->mutex);
3222 if (share->op && share->new_op)
3223 {
3224 Ndb_event_data *event_data=
3225 (Ndb_event_data *) share->op->getCustomData();
3226 if (event_data)
3227 delete event_data;
3228 share->op->setCustomData(NULL);
3229 {
3230 Mutex_guard injector_mutex_g(injector_mutex);
3231 injector_ndb->dropEventOperation(share->op);
3232 }
3233 share->op= share->new_op;
3234 share->new_op= 0;
3235 free_share(&share);
3236 assert(share); // Should still be ref'ed
3237 }
3238 native_mutex_unlock(&share->mutex);
3239
3240 free_share(&share);
3241 }
3242 }
3243
3244
3245 void
handle_drop_table(Ndb_schema_op * schema)3246 handle_drop_table(Ndb_schema_op* schema)
3247 {
3248 DBUG_ENTER("handle_drop_table");
3249
3250 assert(is_post_epoch()); // Always after epoch
3251
3252 if (schema->node_id == own_nodeid())
3253 DBUG_VOID_RETURN;
3254
3255 write_schema_op_to_binlog(m_thd, schema);
3256
3257 Ndb_local_schema::Table tab(m_thd, schema->db, schema->name);
3258 if (tab.is_local_table())
3259 {
3260 /* Table is not a NDB table in this mysqld -> leave it */
3261 sql_print_error("NDB Binlog: Skipping drop of locally "
3262 "defined table '%s.%s' from binlog schema "
3263 "event '%s' from node %d. ",
3264 schema->db, schema->name, schema->query,
3265 schema->node_id);
3266
3267 // There should be no NDB_SHARE for this table
3268 assert(!get_share(schema));
3269
3270 DBUG_VOID_RETURN;
3271 }
3272
3273 tab.remove_table();
3274
3275 NDB_SHARE *share= get_share(schema); // temporary ref.
3276 if (!share || !share->op)
3277 {
3278 ndbapi_invalidate_table(schema->db, schema->name);
3279 mysqld_close_cached_table(schema->db, schema->name);
3280 }
3281 if (share)
3282 {
3283 free_share(&share); // temporary ref.
3284 assert(share); // Should still be ref'ed
3285 free_share(&share); // server ref.
3286 }
3287
3288 ndbapi_invalidate_table(schema->db, schema->name);
3289 mysqld_close_cached_table(schema->db, schema->name);
3290
3291 DBUG_VOID_RETURN;
3292 }
3293
3294
3295 /*
3296 The RENAME is performed in two steps.
3297 1) PREPARE_RENAME - sends the new table key to participants
3298 2) RENAME - perform the actual rename
3299 */
3300
3301 void
handle_rename_table_prepare(Ndb_schema_op * schema)3302 handle_rename_table_prepare(Ndb_schema_op* schema)
3303 {
3304 DBUG_ENTER("handle_rename_table_prepare");
3305
3306 assert(is_post_epoch()); // Always after epoch
3307
3308 if (schema->node_id == own_nodeid())
3309 DBUG_VOID_RETURN;
3310
3311 const char* new_key_for_table= schema->query;
3312 DBUG_PRINT("info", ("new_key_for_table: '%s'", new_key_for_table));
3313
3314 // Release potentially previously prepared new_key
3315 {
3316 NDB_SHARE_KEY* old_prepared_key =
3317 m_schema_dist_data.get_prepared_rename_key();
3318 if (old_prepared_key)
3319 NDB_SHARE::free_key(old_prepared_key);
3320 }
3321
3322 // Create a new key save it, then hope for the best(i.e
3323 // that it can be found later when the RENAME arrives)
3324 NDB_SHARE_KEY* new_prepared_key =
3325 NDB_SHARE::create_key(new_key_for_table);
3326 m_schema_dist_data.save_prepared_rename_key(new_prepared_key);
3327
3328 DBUG_VOID_RETURN;
3329 }
3330
3331
3332 void
handle_rename_table(Ndb_schema_op * schema)3333 handle_rename_table(Ndb_schema_op* schema)
3334 {
3335 DBUG_ENTER("handle_rename_table");
3336
3337 assert(is_post_epoch()); // Always after epoch
3338
3339 if (schema->node_id == own_nodeid())
3340 DBUG_VOID_RETURN;
3341
3342 write_schema_op_to_binlog(m_thd, schema);
3343
3344 Ndb_local_schema::Table from(m_thd, schema->db, schema->name);
3345 if (from.is_local_table())
3346 {
3347 /* Tables exists as a local table, print error and leave it */
3348 sql_print_error("NDB Binlog: Skipping renaming locally "
3349 "defined table '%s.%s' from binlog schema "
3350 "event '%s' from node %d. ",
3351 schema->db, schema->name, schema->query,
3352 schema->node_id);
3353 DBUG_VOID_RETURN;
3354 }
3355
3356 NDB_SHARE *share= get_share(schema); // temporary ref.
3357 if (!share || !share->op)
3358 {
3359 ndbapi_invalidate_table(schema->db, schema->name);
3360 mysqld_close_cached_table(schema->db, schema->name);
3361 }
3362 if (share)
3363 free_share(&share); // temporary ref.
3364
3365 share= get_share(schema); // temporary ref.
3366 if (!share)
3367 {
3368 // The RENAME need to find share so it can be renamed
3369 assert(share);
3370 DBUG_VOID_RETURN;
3371 }
3372
3373 NDB_SHARE_KEY* prepared_key =
3374 m_schema_dist_data.get_prepared_rename_key();
3375 if (!prepared_key)
3376 {
3377 // The rename need to have new_key set
3378 // by a previous RENAME_PREPARE
3379 assert(prepared_key);
3380 DBUG_VOID_RETURN;
3381 }
3382
3383 // Rename on participant is always from real to
3384 // real name(i.e neiher old or new name should be a temporary name)
3385 assert(!IS_TMP_PREFIX(schema->name));
3386 assert(!IS_TMP_PREFIX(NDB_SHARE::key_get_table_name(prepared_key)));
3387
3388 // Rename the local table
3389 from.rename_table(NDB_SHARE::key_get_db_name(prepared_key),
3390 NDB_SHARE::key_get_table_name(prepared_key));
3391
3392 // Rename share and release the old key
3393 NDB_SHARE_KEY* old_key = share->key;
3394 ndbcluster_rename_share(m_thd, share, prepared_key);
3395 m_schema_dist_data.save_prepared_rename_key(NULL);
3396 NDB_SHARE::free_key(old_key);
3397
3398 free_share(&share); // temporary ref.
3399
3400 ndbapi_invalidate_table(schema->db, schema->name);
3401 mysqld_close_cached_table(schema->db, schema->name);
3402
3403 DBUG_VOID_RETURN;
3404 }
3405
3406
3407 void
handle_drop_db(Ndb_schema_op * schema)3408 handle_drop_db(Ndb_schema_op* schema)
3409 {
3410 DBUG_ENTER("handle_drop_db");
3411
3412 assert(is_post_epoch()); // Always after epoch
3413
3414 if (schema->node_id == own_nodeid())
3415 DBUG_VOID_RETURN;
3416
3417 write_schema_op_to_binlog(m_thd, schema);
3418
3419 Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3420 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3421 // Set NO_LOCK_SCHEMA_OP before 'check_if_local_tables_indb'
3422 // until ndbcluster_find_files does not take GSL
3423 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3424
3425 if (check_if_local_tables_in_db(schema->db))
3426 {
3427 /* Tables exists as a local table, print error and leave it */
3428 sql_print_error("NDB Binlog: Skipping drop database '%s' since "
3429 "it contained local tables "
3430 "binlog schema event '%s' from node %d. ",
3431 schema->db, schema->query,
3432 schema->node_id);
3433 DBUG_VOID_RETURN;
3434 }
3435
3436 const int no_print_error[1]= {0};
3437 run_query(m_thd, schema->query,
3438 schema->query + schema->query_length,
3439 no_print_error);
3440
3441 DBUG_VOID_RETURN;
3442 }
3443
3444
3445 void
handle_truncate_table(Ndb_schema_op * schema)3446 handle_truncate_table(Ndb_schema_op* schema)
3447 {
3448 DBUG_ENTER("handle_truncate_table");
3449
3450 assert(!is_post_epoch()); // Always directly
3451
3452 if (schema->node_id == own_nodeid())
3453 DBUG_VOID_RETURN;
3454
3455 write_schema_op_to_binlog(m_thd, schema);
3456
3457 NDB_SHARE *share= get_share(schema);
3458 // invalidation already handled by binlog thread
3459 if (!share || !share->op)
3460 {
3461 ndbapi_invalidate_table(schema->db, schema->name);
3462 mysqld_close_cached_table(schema->db, schema->name);
3463 }
3464 if (share)
3465 free_share(&share);
3466
3467 if (is_local_table(schema->db, schema->name))
3468 {
3469 sql_print_error("NDB Binlog: Skipping locally defined table "
3470 "'%s.%s' from binlog schema event '%s' from "
3471 "node %d. ",
3472 schema->db, schema->name, schema->query,
3473 schema->node_id);
3474 DBUG_VOID_RETURN;
3475 }
3476
3477 if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
3478 {
3479 print_could_not_discover_error(m_thd, schema);
3480 }
3481
3482 DBUG_VOID_RETURN;
3483 }
3484
3485
3486 void
handle_create_table(Ndb_schema_op * schema)3487 handle_create_table(Ndb_schema_op* schema)
3488 {
3489 DBUG_ENTER("handle_create_table");
3490
3491 assert(!is_post_epoch()); // Always directly
3492
3493 if (schema->node_id == own_nodeid())
3494 DBUG_VOID_RETURN;
3495
3496 write_schema_op_to_binlog(m_thd, schema);
3497
3498 if (is_local_table(schema->db, schema->name))
3499 {
3500 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
3501 "binlog schema event '%s' from node %d. ",
3502 schema->db, schema->name, schema->query,
3503 schema->node_id);
3504 DBUG_VOID_RETURN;
3505 }
3506
3507 if (ndb_create_table_from_engine(m_thd, schema->db, schema->name))
3508 {
3509 print_could_not_discover_error(m_thd, schema);
3510 }
3511
3512 DBUG_VOID_RETURN;
3513 }
3514
3515
3516 void
handle_create_db(Ndb_schema_op * schema)3517 handle_create_db(Ndb_schema_op* schema)
3518 {
3519 DBUG_ENTER("handle_create_db");
3520
3521 assert(!is_post_epoch()); // Always directly
3522
3523 if (schema->node_id == own_nodeid())
3524 DBUG_VOID_RETURN;
3525
3526 write_schema_op_to_binlog(m_thd, schema);
3527
3528 Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3529 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3530 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3531 const int no_print_error[1]= {0};
3532 run_query(m_thd, schema->query,
3533 schema->query + schema->query_length,
3534 no_print_error);
3535
3536 DBUG_VOID_RETURN;
3537 }
3538
3539
3540 void
handle_alter_db(Ndb_schema_op * schema)3541 handle_alter_db(Ndb_schema_op* schema)
3542 {
3543 DBUG_ENTER("handle_alter_db");
3544
3545 assert(!is_post_epoch()); // Always directly
3546
3547 if (schema->node_id == own_nodeid())
3548 DBUG_VOID_RETURN;
3549
3550 write_schema_op_to_binlog(m_thd, schema);
3551
3552 Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3553 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3554 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3555 const int no_print_error[1]= {0};
3556 run_query(m_thd, schema->query,
3557 schema->query + schema->query_length,
3558 no_print_error);
3559
3560 DBUG_VOID_RETURN;
3561 }
3562
3563
3564 void
handle_grant_op(Ndb_schema_op * schema)3565 handle_grant_op(Ndb_schema_op* schema)
3566 {
3567 DBUG_ENTER("handle_grant_op");
3568
3569 assert(!is_post_epoch()); // Always directly
3570
3571 if (schema->node_id == own_nodeid())
3572 DBUG_VOID_RETURN;
3573
3574 write_schema_op_to_binlog(m_thd, schema);
3575
3576 if (opt_ndb_extra_logging > 9)
3577 sql_print_information("Got dist_priv event: %s, "
3578 "flushing privileges",
3579 get_schema_type_name(schema->type));
3580
3581 Thd_ndb *thd_ndb= get_thd_ndb(m_thd);
3582 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
3583 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3584 const int no_print_error[1]= {0};
3585 char *cmd= (char *) "flush privileges";
3586 run_query(m_thd, cmd,
3587 cmd + strlen(cmd),
3588 no_print_error);
3589
3590 DBUG_VOID_RETURN;
3591 }
3592
3593
3594 int
handle_schema_op(Ndb_schema_op * schema)3595 handle_schema_op(Ndb_schema_op* schema)
3596 {
3597 DBUG_ENTER("handle_schema_op");
3598 {
3599 const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
3600
3601 if (opt_ndb_extra_logging > 19)
3602 {
3603 sql_print_information("NDB: got schema event on %s.%s(%u/%u) query: '%s' type: %s(%d) node: %u slock: %x%x",
3604 schema->db, schema->name,
3605 schema->id, schema->version,
3606 schema->query,
3607 get_schema_type_name(schema_type),
3608 schema_type,
3609 schema->node_id,
3610 schema->slock.bitmap[0],
3611 schema->slock.bitmap[1]);
3612 }
3613
3614 if ((schema->db[0] == 0) && (schema->name[0] == 0))
3615 {
3616 /**
3617 * This happens if there is a schema event on a table (object)
3618 * that this mysqld does not know about.
3619 * E.g it had a local table shadowing a ndb table...
3620 */
3621 DBUG_RETURN(0);
3622 }
3623
3624 switch (schema_type)
3625 {
3626 case SOT_CLEAR_SLOCK:
3627 /*
3628 handle slock after epoch is completed to ensure that
3629 schema events get inserted in the binlog after any data
3630 events
3631 */
3632 handle_after_epoch(schema);
3633 DBUG_RETURN(0);
3634
3635 case SOT_ALTER_TABLE_COMMIT:
3636 case SOT_RENAME_TABLE_PREPARE:
3637 case SOT_ONLINE_ALTER_TABLE_PREPARE:
3638 case SOT_ONLINE_ALTER_TABLE_COMMIT:
3639 case SOT_RENAME_TABLE:
3640 case SOT_DROP_TABLE:
3641 case SOT_DROP_DB:
3642 handle_after_epoch(schema);
3643 ack_after_epoch(schema);
3644 DBUG_RETURN(0);
3645
3646 case SOT_TRUNCATE_TABLE:
3647 handle_truncate_table(schema);
3648 break;
3649
3650 case SOT_CREATE_TABLE:
3651 handle_create_table(schema);
3652 break;
3653
3654 case SOT_CREATE_DB:
3655 handle_create_db(schema);
3656 break;
3657
3658 case SOT_ALTER_DB:
3659 handle_alter_db(schema);
3660 break;
3661
3662 case SOT_CREATE_USER:
3663 case SOT_DROP_USER:
3664 case SOT_RENAME_USER:
3665 case SOT_GRANT:
3666 case SOT_REVOKE:
3667 handle_grant_op(schema);
3668 break;
3669
3670 case SOT_TABLESPACE:
3671 case SOT_LOGFILE_GROUP:
3672 if (schema->node_id == own_nodeid())
3673 break;
3674 write_schema_op_to_binlog(m_thd, schema);
3675 break;
3676
3677 case SOT_RENAME_TABLE_NEW:
3678 /*
3679 Only very old MySQL Server connected to the cluster may
3680 send this schema operation, ignore it
3681 */
3682 sql_print_error("NDB schema: Skipping old schema operation"
3683 "(RENAME_TABLE_NEW) on %s.%s",
3684 schema->db, schema->name);
3685 assert(false);
3686 break;
3687
3688 }
3689
3690 /* signal that schema operation has been handled */
3691 DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
3692 if (bitmap_is_set(&schema->slock, own_nodeid()))
3693 {
3694 ack_schema_op(schema->db, schema->name,
3695 schema->id, schema->version);
3696 }
3697 }
3698 DBUG_RETURN(0);
3699 }
3700
3701
3702 void
handle_schema_op_post_epoch(Ndb_schema_op * schema)3703 handle_schema_op_post_epoch(Ndb_schema_op* schema)
3704 {
3705 DBUG_ENTER("handle_schema_op_post_epoch");
3706 DBUG_PRINT("enter", ("%s.%s: query: '%s' type: %d",
3707 schema->db, schema->name,
3708 schema->query, schema->type));
3709
3710 {
3711 const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
3712 if (opt_ndb_extra_logging > 9)
3713 sql_print_information("%s - %s.%s",
3714 get_schema_type_name(schema_type),
3715 schema->db ? schema->db : "(null)",
3716 schema->name ? schema->name : "(null)");
3717
3718 switch (schema_type)
3719 {
3720 case SOT_CLEAR_SLOCK:
3721 handle_clear_slock(schema);
3722 break;
3723
3724 case SOT_DROP_DB:
3725 handle_drop_db(schema);
3726 break;
3727
3728 case SOT_DROP_TABLE:
3729 handle_drop_table(schema);
3730 break;
3731
3732 case SOT_RENAME_TABLE_PREPARE:
3733 handle_rename_table_prepare(schema);
3734 break;
3735
3736 case SOT_RENAME_TABLE:
3737 handle_rename_table(schema);
3738 break;
3739
3740 case SOT_ALTER_TABLE_COMMIT:
3741 handle_offline_alter_table_commit(schema);
3742 break;
3743
3744 case SOT_ONLINE_ALTER_TABLE_PREPARE:
3745 handle_online_alter_table_prepare(schema);
3746 break;
3747
3748 case SOT_ONLINE_ALTER_TABLE_COMMIT:
3749 handle_online_alter_table_commit(schema);
3750 break;
3751
3752 default:
3753 assert(FALSE);
3754 }
3755 }
3756
3757 DBUG_VOID_RETURN;
3758 }
3759
3760 THD* m_thd;
3761 MEM_ROOT* m_mem_root;
3762 uint m_own_nodeid;
3763 Ndb_schema_dist_data& m_schema_dist_data;
3764 bool m_post_epoch;
3765
is_post_epoch(void) const3766 bool is_post_epoch(void) const { return m_post_epoch; }
3767
3768 List<Ndb_schema_op> m_post_epoch_handle_list;
3769 List<Ndb_schema_op> m_post_epoch_ack_list;
3770
3771 public:
3772 Ndb_schema_event_handler(); // Not implemented
3773 Ndb_schema_event_handler(const Ndb_schema_event_handler&); // Not implemented
3774
Ndb_schema_event_handler(THD * thd,MEM_ROOT * mem_root,uint own_nodeid,Ndb_schema_dist_data & schema_dist_data)3775 Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root, uint own_nodeid,
3776 Ndb_schema_dist_data& schema_dist_data):
3777 m_thd(thd), m_mem_root(mem_root), m_own_nodeid(own_nodeid),
3778 m_schema_dist_data(schema_dist_data),
3779 m_post_epoch(false)
3780 {
3781 }
3782
3783
~Ndb_schema_event_handler()3784 ~Ndb_schema_event_handler()
3785 {
3786 // There should be no work left todo...
3787 assert(m_post_epoch_handle_list.elements == 0);
3788 assert(m_post_epoch_ack_list.elements == 0);
3789 }
3790
3791
handle_event(Ndb * s_ndb,NdbEventOperation * pOp)3792 void handle_event(Ndb* s_ndb, NdbEventOperation *pOp)
3793 {
3794 DBUG_ENTER("handle_event");
3795
3796 const Ndb_event_data *event_data=
3797 static_cast<const Ndb_event_data*>(pOp->getCustomData());
3798
3799 if (!check_is_ndb_schema_event(event_data))
3800 DBUG_VOID_RETURN;
3801
3802 const NDBEVENT::TableEvent ev_type= pOp->getEventType();
3803 switch (ev_type)
3804 {
3805 case NDBEVENT::TE_INSERT:
3806 case NDBEVENT::TE_UPDATE:
3807 {
3808 /* ndb_schema table, row INSERTed or UPDATEed*/
3809 Ndb_schema_op* schema_op=
3810 Ndb_schema_op::create(event_data, pOp->getAnyValue());
3811 handle_schema_op(schema_op);
3812 break;
3813 }
3814
3815 case NDBEVENT::TE_DELETE:
3816 /* ndb_schema table, row DELETEd */
3817 break;
3818
3819 case NDBEVENT::TE_CLUSTER_FAILURE:
3820 if (opt_ndb_extra_logging)
3821 sql_print_information("NDB Schema dist: cluster failure "
3822 "at epoch %u/%u.",
3823 (uint)(pOp->getGCI() >> 32),
3824 (uint)(pOp->getGCI()));
3825 // fall through
3826 case NDBEVENT::TE_DROP:
3827 /* ndb_schema table DROPped */
3828 if (opt_ndb_extra_logging &&
3829 ndb_binlog_tables_inited && ndb_binlog_running)
3830 sql_print_information("NDB Binlog: ndb tables initially "
3831 "read only on reconnect.");
3832
3833 /* release the ndb_schema_share */
3834 native_mutex_lock(&ndb_schema_share_mutex);
3835 free_share(&ndb_schema_share);
3836 ndb_schema_share= 0;
3837 ndb_binlog_tables_inited= FALSE;
3838 ndb_binlog_is_ready= FALSE;
3839 native_mutex_unlock(&ndb_schema_share_mutex);
3840
3841 ndb_tdc_close_cached_tables();
3842
3843 ndb_handle_schema_change(m_thd, s_ndb, pOp, event_data);
3844 break;
3845
3846 case NDBEVENT::TE_ALTER:
3847 /* ndb_schema table ALTERed */
3848 break;
3849
3850 case NDBEVENT::TE_NODE_FAILURE:
3851 {
3852 /* Remove all subscribers for node */
3853 m_schema_dist_data.report_data_node_failure(pOp->getNdbdNodeId());
3854 (void) native_cond_signal(&injector_cond);
3855 break;
3856 }
3857
3858 case NDBEVENT::TE_SUBSCRIBE:
3859 {
3860 /* Add node as subscriber */
3861 m_schema_dist_data.report_subscribe(pOp->getNdbdNodeId(), pOp->getReqNodeId());
3862 (void) native_cond_signal(&injector_cond);
3863 break;
3864 }
3865
3866 case NDBEVENT::TE_UNSUBSCRIBE:
3867 {
3868 /* Remove node as subscriber */
3869 m_schema_dist_data.report_unsubscribe(pOp->getNdbdNodeId(), pOp->getReqNodeId());
3870 (void) native_cond_signal(&injector_cond);
3871 break;
3872 }
3873
3874 default:
3875 {
3876 sql_print_error("NDB Schema dist: unknown event %u, ignoring...",
3877 ev_type);
3878 }
3879 }
3880
3881 DBUG_VOID_RETURN;
3882 }
3883
3884
post_epoch()3885 void post_epoch()
3886 {
3887 if (unlikely(m_post_epoch_handle_list.elements > 0))
3888 {
3889 // Set the flag used to check that functions are called at correct time
3890 m_post_epoch= true;
3891
3892 /*
3893 process any operations that should be done after
3894 the epoch is complete
3895 */
3896 Ndb_schema_op* schema;
3897 while ((schema= m_post_epoch_handle_list.pop()))
3898 {
3899 handle_schema_op_post_epoch(schema);
3900 }
3901
3902 /*
3903 process any operations that should be unlocked/acked after
3904 the epoch is complete
3905 */
3906 while ((schema= m_post_epoch_ack_list.pop()))
3907 {
3908 ack_schema_op(schema->db, schema->name,
3909 schema->id, schema->version);
3910 }
3911 }
3912 // There should be no work left todo...
3913 assert(m_post_epoch_handle_list.elements == 0);
3914 assert(m_post_epoch_ack_list.elements == 0);
3915 }
3916 };
3917
3918 /*********************************************************************
3919 Internal helper functions for handling of the cluster replication tables
3920 - ndb_binlog_index
3921 - ndb_apply_status
3922 *********************************************************************/
3923
3924 /*
3925 struct to hold the data to be inserted into the
3926 ndb_binlog_index table
3927 */
3928 struct ndb_binlog_index_row {
3929 ulonglong epoch;
3930 const char *start_master_log_file;
3931 ulonglong start_master_log_pos;
3932 ulong n_inserts;
3933 ulong n_updates;
3934 ulong n_deletes;
3935 ulong n_schemaops;
3936
3937 ulong orig_server_id;
3938 ulonglong orig_epoch;
3939
3940 ulong gci;
3941
3942 const char *next_master_log_file;
3943 ulonglong next_master_log_pos;
3944
3945 struct ndb_binlog_index_row *next;
3946 };
3947
3948
3949 /*
3950 Open the ndb_binlog_index table for writing
3951 */
3952 static int
ndb_binlog_index_table__open(THD * thd,TABLE ** ndb_binlog_index)3953 ndb_binlog_index_table__open(THD *thd,
3954 TABLE **ndb_binlog_index)
3955 {
3956 const char *save_proc_info=
3957 thd_proc_info(thd, "Opening " NDB_REP_DB "." NDB_REP_TABLE);
3958
3959 TABLE_LIST tables;
3960 tables.init_one_table(STRING_WITH_LEN(NDB_REP_DB), // db
3961 STRING_WITH_LEN(NDB_REP_TABLE), // name
3962 NDB_REP_TABLE, // alias
3963 TL_WRITE); // for write
3964
3965 /* Only allow real table to be opened */
3966 tables.required_type= FRMTYPE_TABLE;
3967
3968 const uint flags =
3969 MYSQL_LOCK_IGNORE_TIMEOUT; /* Wait for lock "infinitely" */
3970 if (open_and_lock_tables(thd, &tables, flags))
3971 {
3972 if (thd->killed)
3973 DBUG_PRINT("error", ("NDB Binlog: Opening ndb_binlog_index: killed"));
3974 else
3975 sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
3976 thd->get_stmt_da()->mysql_errno(),
3977 thd->get_stmt_da()->message_text());
3978 thd_proc_info(thd, save_proc_info);
3979 return -1;
3980 }
3981 *ndb_binlog_index= tables.table;
3982 thd_proc_info(thd, save_proc_info);
3983 return 0;
3984 }
3985
3986
3987 /*
3988 Write rows to the ndb_binlog_index table
3989 */
3990 static int
ndb_binlog_index_table__write_rows(THD * thd,ndb_binlog_index_row * row)3991 ndb_binlog_index_table__write_rows(THD *thd,
3992 ndb_binlog_index_row *row)
3993 {
3994 int error= 0;
3995 ndb_binlog_index_row *first= row;
3996 TABLE *ndb_binlog_index= 0;
3997
3998 /*
3999 Assume this function is not called with an error set in thd
4000 (but clear for safety in release version)
4001 */
4002 assert(!thd->is_error());
4003 thd->clear_error();
4004
4005 /*
4006 Turn of binlogging to prevent the table changes to be written to
4007 the binary log.
4008 */
4009 tmp_disable_binlog(thd);
4010
4011 if (ndb_binlog_index_table__open(thd, &ndb_binlog_index))
4012 {
4013 if (thd->killed)
4014 DBUG_PRINT("error", ("NDB Binlog: Unable to lock table ndb_binlog_index, killed"));
4015 else
4016 sql_print_error("NDB Binlog: Unable to lock table ndb_binlog_index");
4017 error= -1;
4018 goto add_ndb_binlog_index_err;
4019 }
4020
4021 // Set all columns to be written
4022 ndb_binlog_index->use_all_columns();
4023
4024 do
4025 {
4026 ulonglong epoch= 0, orig_epoch= 0;
4027 uint orig_server_id= 0;
4028
4029 // Intialize ndb_binlog_index->record[0]
4030 empty_record(ndb_binlog_index);
4031
4032 ndb_binlog_index->field[NBICOL_START_POS]
4033 ->store(first->start_master_log_pos, true);
4034 ndb_binlog_index->field[NBICOL_START_FILE]
4035 ->store(first->start_master_log_file,
4036 (uint)strlen(first->start_master_log_file),
4037 &my_charset_bin);
4038 ndb_binlog_index->field[NBICOL_EPOCH]
4039 ->store(epoch= first->epoch, true);
4040 if (ndb_binlog_index->s->fields > NBICOL_ORIG_SERVERID)
4041 {
4042 /* Table has ORIG_SERVERID / ORIG_EPOCH columns.
4043 * Write rows with different ORIG_SERVERID / ORIG_EPOCH
4044 * separately
4045 */
4046 ndb_binlog_index->field[NBICOL_NUM_INSERTS]
4047 ->store(row->n_inserts, true);
4048 ndb_binlog_index->field[NBICOL_NUM_UPDATES]
4049 ->store(row->n_updates, true);
4050 ndb_binlog_index->field[NBICOL_NUM_DELETES]
4051 ->store(row->n_deletes, true);
4052 ndb_binlog_index->field[NBICOL_NUM_SCHEMAOPS]
4053 ->store(row->n_schemaops, true);
4054 ndb_binlog_index->field[NBICOL_ORIG_SERVERID]
4055 ->store(orig_server_id= row->orig_server_id, true);
4056 ndb_binlog_index->field[NBICOL_ORIG_EPOCH]
4057 ->store(orig_epoch= row->orig_epoch, true);
4058 ndb_binlog_index->field[NBICOL_GCI]
4059 ->store(first->gci, true);
4060
4061 if (ndb_binlog_index->s->fields > NBICOL_NEXT_POS)
4062 {
4063 /* Table has next log pos fields, fill them in */
4064 ndb_binlog_index->field[NBICOL_NEXT_POS]
4065 ->store(first->next_master_log_pos, true);
4066 ndb_binlog_index->field[NBICOL_NEXT_FILE]
4067 ->store(first->next_master_log_file,
4068 (uint)strlen(first->next_master_log_file),
4069 &my_charset_bin);
4070 }
4071 row= row->next;
4072 }
4073 else
4074 {
4075 /* Old schema : Table has no separate
4076 * ORIG_SERVERID / ORIG_EPOCH columns.
4077 * Merge operation counts and write one row
4078 */
4079 while ((row= row->next))
4080 {
4081 first->n_inserts+= row->n_inserts;
4082 first->n_updates+= row->n_updates;
4083 first->n_deletes+= row->n_deletes;
4084 first->n_schemaops+= row->n_schemaops;
4085 }
4086 ndb_binlog_index->field[NBICOL_NUM_INSERTS]
4087 ->store((ulonglong)first->n_inserts, true);
4088 ndb_binlog_index->field[NBICOL_NUM_UPDATES]
4089 ->store((ulonglong)first->n_updates, true);
4090 ndb_binlog_index->field[NBICOL_NUM_DELETES]
4091 ->store((ulonglong)first->n_deletes, true);
4092 ndb_binlog_index->field[NBICOL_NUM_SCHEMAOPS]
4093 ->store((ulonglong)first->n_schemaops, true);
4094 }
4095
4096 error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0]);
4097
4098 /* Fault injection to test logging */
4099 DBUG_EXECUTE_IF("ndb_injector_binlog_index_write_fail_random",
4100 {
4101 if ((((uint32) rand()) % 10) == 9)
4102 {
4103 sql_print_error("NDB Binlog: Injecting random write failure");
4104 error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0]);
4105 }
4106 });
4107
4108 if (error)
4109 {
4110 sql_print_error("NDB Binlog: Failed writing to ndb_binlog_index for epoch %u/%u "
4111 " orig_server_id %u orig_epoch %u/%u "
4112 "with error %d.",
4113 uint(epoch >> 32), uint(epoch),
4114 orig_server_id,
4115 uint(orig_epoch >> 32), uint(orig_epoch),
4116 error);
4117
4118 bool seen_error_row = false;
4119 ndb_binlog_index_row* cursor= first;
4120 do
4121 {
4122 char tmp[128];
4123 if (ndb_binlog_index->s->fields > NBICOL_ORIG_SERVERID)
4124 my_snprintf(tmp, sizeof(tmp), "%u/%u,%u,%u/%u",
4125 uint(epoch >> 32), uint(epoch),
4126 uint(cursor->orig_server_id),
4127 uint(cursor->orig_epoch >> 32),
4128 uint(cursor->orig_epoch));
4129
4130 else
4131 my_snprintf(tmp, sizeof(tmp), "%u/%u", uint(epoch >> 32), uint(epoch));
4132
4133 bool error_row = (row == (cursor->next));
4134 sql_print_error("NDB Binlog: Writing row (%s) to ndb_binlog_index - %s",
4135 tmp,
4136 (error_row?"ERROR":
4137 (seen_error_row?"Discarded":
4138 "OK")));
4139 seen_error_row |= error_row;
4140
4141 } while ((cursor = cursor->next));
4142
4143 error= -1;
4144 goto add_ndb_binlog_index_err;
4145 }
4146 } while (row);
4147
4148 add_ndb_binlog_index_err:
4149 /*
4150 Explicitly commit or rollback the writes(although we normally
4151 use a non transactional engine for the ndb_binlog_index table)
4152 */
4153 thd->get_stmt_da()->set_overwrite_status(true);
4154 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
4155 thd->get_stmt_da()->set_overwrite_status(false);
4156
4157 // Close the tables this thread has opened
4158 close_thread_tables(thd);
4159
4160 /*
4161 There should be no need for rolling back transaction due to deadlock
4162 (since ndb_binlog_index is non transactional).
4163 */
4164 assert(! thd->transaction_rollback_request);
4165
4166 // Release MDL locks on the opened table
4167 thd->mdl_context.release_transactional_locks();
4168
4169 reenable_binlog(thd);
4170 return error;
4171 }
4172
4173 /*********************************************************************
4174 Functions for start, stop, wait for ndbcluster binlog thread
4175 *********************************************************************/
4176
ndbcluster_binlog_start()4177 int ndbcluster_binlog_start()
4178 {
4179 DBUG_ENTER("ndbcluster_binlog_start");
4180
4181 if (::server_id == 0)
4182 {
4183 sql_print_warning("NDB: server id set to zero - changes logged to "
4184 "bin log with server id zero will be logged with "
4185 "another server id by slave mysqlds");
4186 }
4187
4188 /*
4189 Check that ServerId is not using the reserved bit or bits reserved
4190 for application use
4191 */
4192 if ((::server_id & 0x1 << 31) || // Reserved bit
4193 !ndbcluster_anyvalue_is_serverid_in_range(::server_id)) // server_id_bits
4194 {
4195 sql_print_error("NDB: server id provided is too large to be represented in "
4196 "opt_server_id_bits or is reserved");
4197 DBUG_RETURN(-1);
4198 }
4199
4200 /*
4201 Check that v2 events are enabled if log-transaction-id is set
4202 */
4203 if (opt_ndb_log_transaction_id &&
4204 log_bin_use_v1_row_events)
4205 {
4206 sql_print_error("NDB: --ndb-log-transaction-id requires v2 Binlog row events "
4207 "but server is using v1.");
4208 DBUG_RETURN(-1);
4209 }
4210
4211 ndb_binlog_thread.init();
4212
4213 native_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
4214 native_cond_init(&injector_cond);
4215 native_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
4216
4217 // The binlog thread globals has been initied and should be freed
4218 ndbcluster_binlog_inited= 1;
4219
4220 /* Start ndb binlog thread */
4221 if (ndb_binlog_thread.start())
4222 {
4223 DBUG_PRINT("error", ("Could not start ndb binlog thread"));
4224 DBUG_RETURN(-1);
4225 }
4226
4227 DBUG_RETURN(0);
4228 }
4229
4230
4231 /**************************************************************
4232 Internal helper functions for creating/dropping ndb events
4233 used by the client sql threads
4234 **************************************************************/
4235 void
ndb_rep_event_name(String * event_name,const char * db,const char * tbl,my_bool full)4236 ndb_rep_event_name(String *event_name,const char *db, const char *tbl,
4237 my_bool full)
4238 {
4239 if (full)
4240 event_name->set_ascii("REPLF$", 6);
4241 else
4242 event_name->set_ascii("REPL$", 5);
4243 event_name->append(db);
4244 #ifdef NDB_WIN32
4245 /*
4246 * Some bright spark decided that we should sometimes have backslashes.
4247 * This causes us pain as the event is db/table and not db\table so trying
4248 * to drop db\table when we meant db/table ends in the event lying around
4249 * after drop table, leading to all sorts of pain.
4250 */
4251 String backslash_sep(1);
4252 backslash_sep.set_ascii("\\",1);
4253
4254 int bsloc;
4255 if((bsloc= event_name->strstr(backslash_sep,0))!=-1)
4256 event_name->replace(bsloc, 1, "/", 1);
4257 #endif
4258 if (tbl)
4259 {
4260 event_name->append('/');
4261 event_name->append(tbl);
4262 }
4263 DBUG_PRINT("info", ("ndb_rep_event_name: %s", event_name->c_ptr()));
4264 }
4265
4266 #ifdef HAVE_NDB_BINLOG
4267 static void
set_binlog_flags(NDB_SHARE * share,Ndb_binlog_type ndb_binlog_type)4268 set_binlog_flags(NDB_SHARE *share,
4269 Ndb_binlog_type ndb_binlog_type)
4270 {
4271 DBUG_ENTER("set_binlog_flags");
4272 switch (ndb_binlog_type)
4273 {
4274 case NBT_NO_LOGGING:
4275 DBUG_PRINT("info", ("NBT_NO_LOGGING"));
4276 set_binlog_nologging(share);
4277 DBUG_VOID_RETURN;
4278 case NBT_DEFAULT:
4279 DBUG_PRINT("info", ("NBT_DEFAULT"));
4280 if (opt_ndb_log_updated_only)
4281 {
4282 set_binlog_updated_only(share);
4283 }
4284 else
4285 {
4286 set_binlog_full(share);
4287 }
4288 if (opt_ndb_log_update_as_write)
4289 {
4290 set_binlog_use_write(share);
4291 }
4292 else
4293 {
4294 set_binlog_use_update(share);
4295 }
4296 if (opt_ndb_log_update_minimal)
4297 {
4298 set_binlog_update_minimal(share);
4299 }
4300 break;
4301 case NBT_UPDATED_ONLY:
4302 DBUG_PRINT("info", ("NBT_UPDATED_ONLY"));
4303 set_binlog_updated_only(share);
4304 set_binlog_use_write(share);
4305 break;
4306 case NBT_USE_UPDATE:
4307 DBUG_PRINT("info", ("NBT_USE_UPDATE"));
4308 case NBT_UPDATED_ONLY_USE_UPDATE:
4309 DBUG_PRINT("info", ("NBT_UPDATED_ONLY_USE_UPDATE"));
4310 set_binlog_updated_only(share);
4311 set_binlog_use_update(share);
4312 break;
4313 case NBT_FULL:
4314 DBUG_PRINT("info", ("NBT_FULL"));
4315 set_binlog_full(share);
4316 set_binlog_use_write(share);
4317 break;
4318 case NBT_FULL_USE_UPDATE:
4319 DBUG_PRINT("info", ("NBT_FULL_USE_UPDATE"));
4320 set_binlog_full(share);
4321 set_binlog_use_update(share);
4322 break;
4323 case NBT_UPDATED_ONLY_MINIMAL:
4324 DBUG_PRINT("info", ("NBT_UPDATED_ONLY_MINIMAL"));
4325 set_binlog_updated_only(share);
4326 set_binlog_use_update(share);
4327 set_binlog_update_minimal(share);
4328 break;
4329 case NBT_UPDATED_FULL_MINIMAL:
4330 DBUG_PRINT("info", ("NBT_UPDATED_FULL_MINIMAL"));
4331 set_binlog_full(share);
4332 set_binlog_use_update(share);
4333 set_binlog_update_minimal(share);
4334 break;
4335 default:
4336 DBUG_VOID_RETURN;
4337 }
4338 set_binlog_logging(share);
4339 DBUG_VOID_RETURN;
4340 }
4341
4342
4343 /*
4344 ndbcluster_get_binlog_replication_info
4345
4346 This function retrieves the data for the given table
4347 from the ndb_replication table.
4348
4349 If the table is not found, or the table does not exist,
4350 then defaults are returned.
4351 */
4352 int
ndbcluster_get_binlog_replication_info(THD * thd,Ndb * ndb,const char * db,const char * table_name,uint server_id,Uint32 * binlog_flags,const st_conflict_fn_def ** conflict_fn,st_conflict_fn_arg * args,Uint32 * num_args)4353 ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
4354 const char* db,
4355 const char* table_name,
4356 uint server_id,
4357 Uint32* binlog_flags,
4358 const st_conflict_fn_def** conflict_fn,
4359 st_conflict_fn_arg* args,
4360 Uint32* num_args)
4361 {
4362 DBUG_ENTER("ndbcluster_get_binlog_replication_info");
4363
4364 /* Override for ndb_apply_status when logging */
4365 if (opt_ndb_log_apply_status)
4366 {
4367 if (strcmp(db, NDB_REP_DB) == 0 &&
4368 strcmp(table_name, NDB_APPLY_TABLE) == 0)
4369 {
4370 /*
4371 Ensure that we get all columns from ndb_apply_status updates
4372 by forcing FULL event type
4373 Also, ensure that ndb_apply_status events are always logged as
4374 WRITES.
4375 */
4376 DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
4377 sql_print_information("NDB : ndb-log-apply-status forcing "
4378 "%s.%s to FULL USE_WRITE",
4379 NDB_REP_DB, NDB_APPLY_TABLE);
4380 *binlog_flags = NBT_FULL;
4381 *conflict_fn = NULL;
4382 *num_args = 0;
4383 DBUG_RETURN(0);
4384 }
4385 }
4386
4387 Ndb_rep_tab_reader rep_tab_reader;
4388
4389 int rc = rep_tab_reader.lookup(ndb,
4390 db,
4391 table_name,
4392 server_id);
4393
4394 const char* msg = rep_tab_reader.get_warning_message();
4395 if (msg != NULL)
4396 {
4397 push_warning_printf(thd, Sql_condition::SL_WARNING,
4398 ER_NDB_REPLICATION_SCHEMA_ERROR,
4399 ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
4400 msg);
4401 sql_print_warning("NDB Binlog: %s",
4402 msg);
4403 }
4404
4405 if (rc != 0)
4406 DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
4407
4408 *binlog_flags= rep_tab_reader.get_binlog_flags();
4409 const char* conflict_fn_spec= rep_tab_reader.get_conflict_fn_spec();
4410
4411 if (conflict_fn_spec != NULL)
4412 {
4413 char msgbuf[ FN_REFLEN ];
4414 if (parse_conflict_fn_spec(conflict_fn_spec,
4415 conflict_fn,
4416 args,
4417 num_args,
4418 msgbuf,
4419 sizeof(msgbuf)) != 0)
4420 {
4421 push_warning_printf(thd, Sql_condition::SL_WARNING,
4422 ER_CONFLICT_FN_PARSE_ERROR,
4423 ER(ER_CONFLICT_FN_PARSE_ERROR),
4424 msgbuf);
4425
4426 /*
4427 Log as well, useful for contexts where the thd's stack of
4428 warnings are ignored
4429 */
4430 if (opt_ndb_extra_logging)
4431 {
4432 sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s",
4433 db, table_name,
4434 msgbuf);
4435 }
4436
4437 DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
4438 }
4439 }
4440 else
4441 {
4442 /* No conflict function specified */
4443 conflict_fn= NULL;
4444 num_args= 0;
4445 }
4446
4447 DBUG_RETURN(0);
4448 }
4449
4450 int
ndbcluster_apply_binlog_replication_info(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,const st_conflict_fn_def * conflict_fn,const st_conflict_fn_arg * args,Uint32 num_args,bool do_set_binlog_flags,Uint32 binlog_flags)4451 ndbcluster_apply_binlog_replication_info(THD *thd,
4452 NDB_SHARE *share,
4453 const NDBTAB* ndbtab,
4454 const st_conflict_fn_def* conflict_fn,
4455 const st_conflict_fn_arg* args,
4456 Uint32 num_args,
4457 bool do_set_binlog_flags,
4458 Uint32 binlog_flags)
4459 {
4460 DBUG_ENTER("ndbcluster_apply_binlog_replication_info");
4461 char tmp_buf[FN_REFLEN];
4462
4463 if (do_set_binlog_flags)
4464 {
4465 DBUG_PRINT("info", ("Setting binlog flags to %u", binlog_flags));
4466 set_binlog_flags(share, (enum Ndb_binlog_type)binlog_flags);
4467 }
4468
4469 if (conflict_fn != NULL)
4470 {
4471 if (setup_conflict_fn(get_thd_ndb(thd)->ndb,
4472 &share->m_cfn_share,
4473 share->db,
4474 share->table_name,
4475 ((share->flags & NSF_BLOB_FLAG) != 0),
4476 get_binlog_use_update(share),
4477 ndbtab,
4478 tmp_buf, sizeof(tmp_buf),
4479 conflict_fn,
4480 args,
4481 num_args) == 0)
4482 {
4483 if (opt_ndb_extra_logging)
4484 {
4485 sql_print_information("%s", tmp_buf);
4486 }
4487 }
4488 else
4489 {
4490 /*
4491 Dump setup failure message to error log
4492 for cases where thd warning stack is
4493 ignored
4494 */
4495 sql_print_warning("NDB Slave: Table %s.%s : %s",
4496 share->db,
4497 share->table_name,
4498 tmp_buf);
4499
4500 push_warning_printf(thd, Sql_condition::SL_WARNING,
4501 ER_CONFLICT_FN_PARSE_ERROR,
4502 ER(ER_CONFLICT_FN_PARSE_ERROR),
4503 tmp_buf);
4504
4505 DBUG_RETURN(-1);
4506 }
4507 }
4508 else
4509 {
4510 /* No conflict function specified */
4511 slave_reset_conflict_fn(share->m_cfn_share);
4512 }
4513
4514 DBUG_RETURN(0);
4515 }
4516
4517 int
ndbcluster_read_binlog_replication(THD * thd,Ndb * ndb,NDB_SHARE * share,const NDBTAB * ndbtab,uint server_id,bool do_set_binlog_flags)4518 ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
4519 NDB_SHARE *share,
4520 const NDBTAB *ndbtab,
4521 uint server_id,
4522 bool do_set_binlog_flags)
4523 {
4524 DBUG_ENTER("ndbcluster_read_binlog_replication");
4525 Uint32 binlog_flags;
4526 const st_conflict_fn_def* conflict_fn= NULL;
4527 st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
4528 Uint32 num_args = MAX_CONFLICT_ARGS;
4529
4530 if ((ndbcluster_get_binlog_replication_info(thd, ndb,
4531 share->db,
4532 share->table_name,
4533 server_id,
4534 &binlog_flags,
4535 &conflict_fn,
4536 args,
4537 &num_args) != 0) ||
4538 (ndbcluster_apply_binlog_replication_info(thd,
4539 share,
4540 ndbtab,
4541 conflict_fn,
4542 args,
4543 num_args,
4544 do_set_binlog_flags,
4545 binlog_flags) != 0))
4546 {
4547 DBUG_RETURN(-1);
4548 }
4549
4550 DBUG_RETURN(0);
4551 }
4552 #endif /* HAVE_NDB_BINLOG */
4553
4554 bool
ndbcluster_check_if_local_table(const char * dbname,const char * tabname)4555 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
4556 {
4557 char key[FN_REFLEN + 1];
4558 char ndb_file[FN_REFLEN + 1];
4559
4560 DBUG_ENTER("ndbcluster_check_if_local_table");
4561 build_table_filename(key, sizeof(key)-1, dbname, tabname, reg_ext, 0);
4562 build_table_filename(ndb_file, sizeof(ndb_file)-1,
4563 dbname, tabname, ha_ndb_ext, 0);
4564 /* Check that any defined table is an ndb table */
4565 DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
4566 if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
4567 {
4568 DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
4569
4570
4571 DBUG_RETURN(true);
4572 }
4573
4574 DBUG_RETURN(false);
4575 }
4576
4577
4578 /*
4579 Common function for setting up everything for logging a table at
4580 create/discover.
4581 */
ndbcluster_create_binlog_setup(THD * thd,Ndb * ndb,const char * key,uint key_len,const char * db,const char * table_name,TABLE * table)4582 int ndbcluster_create_binlog_setup(THD *thd, Ndb *ndb, const char *key,
4583 uint key_len,
4584 const char *db,
4585 const char *table_name,
4586 TABLE * table)
4587 {
4588 DBUG_ENTER("ndbcluster_create_binlog_setup");
4589 DBUG_PRINT("enter",("key: %s key_len: %d %s.%s",
4590 key, key_len, db, table_name));
4591 assert(! IS_NDB_BLOB_PREFIX(table_name));
4592 assert(strlen(key) == key_len);
4593
4594 NDB_SHARE* share= get_share(key, table, true, false);
4595 if (share == 0)
4596 {
4597 /**
4598 * Failed to create share
4599 */
4600 DBUG_RETURN(-1);
4601 }
4602
4603 native_mutex_lock(&share->mutex);
4604 if (get_binlog_nologging(share) || share->op != 0 || share->new_op != 0)
4605 {
4606 native_mutex_unlock(&share->mutex);
4607 free_share(&share);
4608 DBUG_RETURN(0); // replication already setup, or should not
4609 }
4610
4611 if (!share->need_events(ndb_binlog_running))
4612 {
4613 set_binlog_nologging(share);
4614 native_mutex_unlock(&share->mutex);
4615 DBUG_RETURN(0);
4616 }
4617
4618 while (share && !IS_TMP_PREFIX(table_name))
4619 {
4620 /*
4621 ToDo make sanity check of share so that the table is actually the same
4622 I.e. we need to do open file from frm in this case
4623 Currently awaiting this to be fixed in the 4.1 tree in the general
4624 case
4625 */
4626
4627 /* Create the event in NDB */
4628 ndb->setDatabaseName(db);
4629
4630 NDBDICT *dict= ndb->getDictionary();
4631 Ndb_table_guard ndbtab_g(dict, table_name);
4632 const NDBTAB *ndbtab= ndbtab_g.get_table();
4633 if (ndbtab == 0)
4634 {
4635 if (opt_ndb_extra_logging)
4636 sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
4637 "%s, %d", key, dict->getNdbError().message,
4638 dict->getNdbError().code);
4639 break; // error
4640 }
4641 #ifdef HAVE_NDB_BINLOG
4642 /*
4643 */
4644 ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
4645 ::server_id, TRUE);
4646 #endif
4647 /*
4648 check if logging turned off for this table
4649 */
4650 if ((share->flags & NSF_HIDDEN_PK) &&
4651 (share->flags & NSF_BLOB_FLAG) &&
4652 !(share->flags & NSF_NO_BINLOG))
4653 {
4654 DBUG_PRINT("NDB_SHARE", ("NSF_HIDDEN_PK && NSF_BLOB_FLAG -> NSF_NO_BINLOG"));
4655 share->flags |= NSF_NO_BINLOG;
4656 }
4657 if (get_binlog_nologging(share))
4658 {
4659 if (opt_ndb_extra_logging)
4660 sql_print_information("NDB Binlog: NOT logging %s",
4661 share->key_string());
4662 native_mutex_unlock(&share->mutex);
4663 DBUG_RETURN(0);
4664 }
4665
4666 String event_name(INJECTOR_EVENT_LEN);
4667 ndb_rep_event_name(&event_name, db, table_name, get_binlog_full(share));
4668 /*
4669 event should have been created by someone else,
4670 but let's make sure, and create if it doesn't exist
4671 */
4672 const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
4673 if (!ev)
4674 {
4675 if (ndbcluster_create_event(thd, ndb, ndbtab, event_name.c_ptr(), share))
4676 {
4677 sql_print_error("NDB Binlog: "
4678 "FAILED CREATE (DISCOVER) TABLE Event: %s",
4679 event_name.c_ptr());
4680 break; // error
4681 }
4682 if (opt_ndb_extra_logging)
4683 sql_print_information("NDB Binlog: "
4684 "CREATE (DISCOVER) TABLE Event: %s",
4685 event_name.c_ptr());
4686 }
4687 else
4688 {
4689 delete ev;
4690 if (opt_ndb_extra_logging)
4691 sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
4692 event_name.c_ptr());
4693 }
4694
4695 /*
4696 create the event operations for receiving logging events
4697 */
4698 if (ndbcluster_create_event_ops(thd, share,
4699 ndbtab, event_name.c_ptr()))
4700 {
4701 sql_print_error("NDB Binlog:"
4702 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
4703 event_name.c_ptr());
4704 /* a warning has been issued to the client */
4705 break;
4706 }
4707 native_mutex_unlock(&share->mutex);
4708 DBUG_RETURN(0);
4709 }
4710
4711 native_mutex_unlock(&share->mutex);
4712 free_share(&share);
4713 DBUG_RETURN(-1);
4714 }
4715
4716 int
ndbcluster_create_event(THD * thd,Ndb * ndb,const NDBTAB * ndbtab,const char * event_name,NDB_SHARE * share,int push_warning)4717 ndbcluster_create_event(THD *thd, Ndb *ndb, const NDBTAB *ndbtab,
4718 const char *event_name, NDB_SHARE *share,
4719 int push_warning)
4720 {
4721 DBUG_ENTER("ndbcluster_create_event");
4722 DBUG_PRINT("enter", ("table: '%s', version: %d",
4723 ndbtab->getName(), ndbtab->getObjectVersion()));
4724 DBUG_PRINT("enter", ("event: '%s', share->key: '%s'",
4725 event_name, share->key_string()));
4726
4727 // Never create event on table with temporary name
4728 assert(! IS_TMP_PREFIX(ndbtab->getName()));
4729 // Never create event on the blob table(s)
4730 assert(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
4731 assert(share);
4732
4733 if (get_binlog_nologging(share))
4734 {
4735 if (opt_ndb_extra_logging && ndb_binlog_running)
4736 sql_print_information("NDB Binlog: NOT logging %s",
4737 share->key_string());
4738 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
4739 share->flags, share->flags & NSF_NO_BINLOG));
4740 DBUG_RETURN(0);
4741 }
4742
4743 ndb->setDatabaseName(share->db);
4744 NDBDICT *dict= ndb->getDictionary();
4745 NDBEVENT my_event(event_name);
4746 my_event.setTable(*ndbtab);
4747 my_event.addTableEvent(NDBEVENT::TE_ALL);
4748 if (share->flags & NSF_HIDDEN_PK)
4749 {
4750 if (share->flags & NSF_BLOB_FLAG)
4751 {
4752 sql_print_error("NDB Binlog: logging of table %s "
4753 "with BLOB attribute and no PK is not supported",
4754 share->key_string());
4755 if (push_warning)
4756 push_warning_printf(thd, Sql_condition::SL_WARNING,
4757 ER_ILLEGAL_HA_CREATE_OPTION,
4758 ER(ER_ILLEGAL_HA_CREATE_OPTION),
4759 ndbcluster_hton_name,
4760 "Binlog of table with BLOB attribute and no PK");
4761
4762 share->flags|= NSF_NO_BINLOG;
4763 DBUG_RETURN(-1);
4764 }
4765 /* No primary key, subscribe for all attributes */
4766 my_event.setReport((NDBEVENT::EventReport)
4767 (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
4768 DBUG_PRINT("info", ("subscription all"));
4769 }
4770 else
4771 {
4772 if (strcmp(share->db, NDB_REP_DB) == 0 &&
4773 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
4774 {
4775 /**
4776 * ER_SUBSCRIBE is only needed on NDB_SCHEMA_TABLE
4777 */
4778 my_event.setReport((NDBEVENT::EventReport)
4779 (NDBEVENT::ER_ALL |
4780 NDBEVENT::ER_SUBSCRIBE |
4781 NDBEVENT::ER_DDL));
4782 DBUG_PRINT("info", ("subscription all and subscribe"));
4783 }
4784 else
4785 {
4786 if (get_binlog_full(share))
4787 {
4788 my_event.setReport((NDBEVENT::EventReport)
4789 (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
4790 DBUG_PRINT("info", ("subscription all"));
4791 }
4792 else
4793 {
4794 my_event.setReport((NDBEVENT::EventReport)
4795 (NDBEVENT::ER_UPDATED | NDBEVENT::ER_DDL));
4796 DBUG_PRINT("info", ("subscription only updated"));
4797 }
4798 }
4799 }
4800 if (share->flags & NSF_BLOB_FLAG)
4801 my_event.mergeEvents(TRUE);
4802
4803 /* add all columns to the event */
4804 int n_cols= ndbtab->getNoOfColumns();
4805 for(int a= 0; a < n_cols; a++)
4806 my_event.addEventColumn(a);
4807
4808 if (dict->createEvent(my_event)) // Add event to database
4809 {
4810 if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
4811 {
4812 /*
4813 failed, print a warning
4814 */
4815 if (push_warning > 1)
4816 push_warning_printf(thd, Sql_condition::SL_WARNING,
4817 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
4818 dict->getNdbError().code,
4819 dict->getNdbError().message, "NDB");
4820 sql_print_error("NDB Binlog: Unable to create event in database. "
4821 "Event: %s Error Code: %d Message: %s", event_name,
4822 dict->getNdbError().code, dict->getNdbError().message);
4823 DBUG_RETURN(-1);
4824 }
4825
4826 /*
4827 try retrieving the event, if table version/id matches, we will get
4828 a valid event. Otherwise we have a trailing event from before
4829 */
4830 const NDBEVENT *ev;
4831 if ((ev= dict->getEvent(event_name)))
4832 {
4833 delete ev;
4834 DBUG_RETURN(0);
4835 }
4836
4837 /*
4838 trailing event from before; an error, but try to correct it
4839 */
4840 if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
4841 dict->dropEvent(my_event.getName(), 1))
4842 {
4843 if (push_warning > 1)
4844 push_warning_printf(thd, Sql_condition::SL_WARNING,
4845 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
4846 dict->getNdbError().code,
4847 dict->getNdbError().message, "NDB");
4848 sql_print_error("NDB Binlog: Unable to create event in database. "
4849 " Attempt to correct with drop failed. "
4850 "Event: %s Error Code: %d Message: %s",
4851 event_name,
4852 dict->getNdbError().code,
4853 dict->getNdbError().message);
4854 DBUG_RETURN(-1);
4855 }
4856
4857 /*
4858 try to add the event again
4859 */
4860 if (dict->createEvent(my_event))
4861 {
4862 if (push_warning > 1)
4863 push_warning_printf(thd, Sql_condition::SL_WARNING,
4864 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
4865 dict->getNdbError().code,
4866 dict->getNdbError().message, "NDB");
4867 sql_print_error("NDB Binlog: Unable to create event in database. "
4868 " Attempt to correct with drop ok, but create failed. "
4869 "Event: %s Error Code: %d Message: %s",
4870 event_name,
4871 dict->getNdbError().code,
4872 dict->getNdbError().message);
4873 DBUG_RETURN(-1);
4874 }
4875 }
4876
4877 DBUG_RETURN(0);
4878 }
4879
4880
is_ndb_compatible_type(Field * field)4881 inline int is_ndb_compatible_type(Field *field)
4882 {
4883 return
4884 !(field->flags & BLOB_FLAG) &&
4885 field->type() != MYSQL_TYPE_BIT &&
4886 field->pack_length() != 0;
4887 }
4888
4889 /*
4890 - create eventOperations for receiving log events
4891 - setup ndb recattrs for reception of log event data
4892 - "start" the event operation
4893
4894 used at create/discover of tables
4895 */
4896 int
ndbcluster_create_event_ops(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,const char * event_name)4897 ndbcluster_create_event_ops(THD *thd, NDB_SHARE *share,
4898 const NDBTAB *ndbtab, const char *event_name)
4899 {
4900 /*
4901 we are in either create table or rename table so table should be
4902 locked, hence we can work with the share without locks
4903 */
4904
4905 DBUG_ENTER("ndbcluster_create_event_ops");
4906 DBUG_PRINT("enter", ("table: '%s' event: '%s', share->key: '%s'",
4907 ndbtab->getName(), event_name, share->key_string()));
4908
4909 // Never create event on table with temporary name
4910 assert(! IS_TMP_PREFIX(ndbtab->getName()));
4911 // Never create event on the blob table(s)
4912 assert(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
4913 assert(share);
4914
4915 if (get_binlog_nologging(share))
4916 {
4917 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
4918 share->flags));
4919 DBUG_RETURN(0);
4920 }
4921
4922 // Don't allow event ops to be created on distributed priv tables
4923 // they are distributed via ndb_schema
4924 assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
4925 share->table_name));
4926
4927 int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
4928 if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
4929 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
4930 do_ndb_schema_share= 1;
4931 else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
4932 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
4933 do_ndb_apply_status_share= 1;
4934 else
4935 #ifdef HAVE_NDB_BINLOG
4936 if (!binlog_filter->db_ok(share->db) ||
4937 !ndb_binlog_running ||
4938 is_exceptions_table(share->table_name))
4939 #endif
4940 {
4941 share->flags|= NSF_NO_BINLOG;
4942 DBUG_RETURN(0);
4943 }
4944
4945 // Check that the share agrees
4946 assert(share->need_events(ndb_binlog_running));
4947
4948 Ndb_event_data *event_data= share->event_data;
4949 if (share->op)
4950 {
4951 event_data= (Ndb_event_data *) share->op->getCustomData();
4952 assert(event_data->share == share);
4953 assert(share->event_data == 0);
4954
4955 assert(share->use_count > 1);
4956 sql_print_error("NDB Binlog: discover reusing old ev op");
4957 /* ndb_share reference ToDo free */
4958 DBUG_PRINT("NDB_SHARE", ("%s ToDo free use_count: %u",
4959 share->key_string(), share->use_count));
4960 free_share(&share); // old event op already has reference
4961 DBUG_RETURN(0);
4962 }
4963
4964 assert(event_data != 0);
4965 TABLE *table= event_data->shadow_table;
4966
4967 int retries= 100;
4968 /*
4969 100 milliseconds, temporary error on schema operation can
4970 take some time to be resolved
4971 */
4972 int retry_sleep= 100;
4973 while (1)
4974 {
4975 Mutex_guard injector_mutex_g(injector_mutex);
4976 Ndb *ndb= injector_ndb;
4977 if (do_ndb_schema_share)
4978 ndb= schema_ndb;
4979
4980 if (ndb == 0)
4981 DBUG_RETURN(-1);
4982
4983 NdbEventOperation* op;
4984 if (do_ndb_schema_share)
4985 op= ndb->createEventOperation(event_name);
4986 else
4987 {
4988 // set injector_ndb database/schema from table internal name
4989 int ret= ndb->setDatabaseAndSchemaName(ndbtab);
4990 assert(ret == 0); NDB_IGNORE_VALUE(ret);
4991 op= ndb->createEventOperation(event_name);
4992 // reset to catch errors
4993 ndb->setDatabaseName("");
4994 }
4995 if (!op)
4996 {
4997 sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
4998 " %s",event_name);
4999 push_warning_printf(thd, Sql_condition::SL_WARNING,
5000 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5001 ndb->getNdbError().code,
5002 ndb->getNdbError().message,
5003 "NDB");
5004 DBUG_RETURN(-1);
5005 }
5006
5007 if (share->flags & NSF_BLOB_FLAG)
5008 op->mergeEvents(TRUE); // currently not inherited from event
5009
5010 const uint n_columns= ndbtab->getNoOfColumns();
5011 const uint n_fields= table->s->fields;
5012 const uint val_length= sizeof(NdbValue) * n_columns;
5013
5014 /*
5015 Allocate memory globally so it can be reused after online alter table
5016 */
5017 if (my_multi_malloc(PSI_INSTRUMENT_ME,
5018 MYF(MY_WME),
5019 &event_data->ndb_value[0],
5020 val_length,
5021 &event_data->ndb_value[1],
5022 val_length,
5023 NULL) == 0)
5024 {
5025 DBUG_PRINT("info", ("Failed to allocate records for event operation"));
5026 DBUG_RETURN(-1);
5027 }
5028
5029 for (uint j= 0; j < n_columns; j++)
5030 {
5031 const char *col_name= ndbtab->getColumn(j)->getName();
5032 NdbValue attr0, attr1;
5033 if (j < n_fields)
5034 {
5035 Field *f= table->field[j];
5036 if (is_ndb_compatible_type(f))
5037 {
5038 DBUG_PRINT("info", ("%s compatible", col_name));
5039 attr0.rec= op->getValue(col_name, (char*) f->ptr);
5040 attr1.rec= op->getPreValue(col_name,
5041 (f->ptr - table->record[0]) +
5042 (char*) table->record[1]);
5043 }
5044 else if (! (f->flags & BLOB_FLAG))
5045 {
5046 DBUG_PRINT("info", ("%s non compatible", col_name));
5047 attr0.rec= op->getValue(col_name);
5048 attr1.rec= op->getPreValue(col_name);
5049 }
5050 else
5051 {
5052 DBUG_PRINT("info", ("%s blob", col_name));
5053 assert(share->flags & NSF_BLOB_FLAG);
5054 attr0.blob= op->getBlobHandle(col_name);
5055 attr1.blob= op->getPreBlobHandle(col_name);
5056 if (attr0.blob == NULL || attr1.blob == NULL)
5057 {
5058 sql_print_error("NDB Binlog: Creating NdbEventOperation"
5059 " blob field %u handles failed (code=%d) for %s",
5060 j, op->getNdbError().code, event_name);
5061 push_warning_printf(thd, Sql_condition::SL_WARNING,
5062 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5063 op->getNdbError().code,
5064 op->getNdbError().message,
5065 "NDB");
5066 ndb->dropEventOperation(op);
5067 DBUG_RETURN(-1);
5068 }
5069 }
5070 }
5071 else
5072 {
5073 DBUG_PRINT("info", ("%s hidden key", col_name));
5074 attr0.rec= op->getValue(col_name);
5075 attr1.rec= op->getPreValue(col_name);
5076 }
5077 event_data->ndb_value[0][j].ptr= attr0.ptr;
5078 event_data->ndb_value[1][j].ptr= attr1.ptr;
5079 DBUG_PRINT("info", ("&event_data->ndb_value[0][%d]: 0x%lx "
5080 "event_data->ndb_value[0][%d]: 0x%lx",
5081 j, (long) &event_data->ndb_value[0][j],
5082 j, (long) attr0.ptr));
5083 DBUG_PRINT("info", ("&event_data->ndb_value[1][%d]: 0x%lx "
5084 "event_data->ndb_value[1][%d]: 0x%lx",
5085 j, (long) &event_data->ndb_value[0][j],
5086 j, (long) attr1.ptr));
5087 }
5088 op->setCustomData((void *) event_data); // set before execute
5089 share->event_data= 0; // take over event data
5090 share->op= op; // assign op in NDB_SHARE
5091
5092 /* Check if user explicitly requires monitoring of empty updates */
5093 if (opt_ndb_log_empty_update)
5094 op->setAllowEmptyUpdate(true);
5095
5096 if (op->execute())
5097 {
5098 share->op= NULL;
5099 retries--;
5100 if (op->getNdbError().status != NdbError::TemporaryError &&
5101 op->getNdbError().code != 1407)
5102 retries= 0;
5103 if (retries == 0)
5104 {
5105 push_warning_printf(thd, Sql_condition::SL_WARNING,
5106 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5107 op->getNdbError().code, op->getNdbError().message,
5108 "NDB");
5109 sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
5110 event_name,
5111 op->getNdbError().code, op->getNdbError().message);
5112 }
5113 share->event_data= event_data;
5114 op->setCustomData(NULL);
5115 ndb->dropEventOperation(op);
5116 if (retries && !thd->killed)
5117 {
5118 do_retry_sleep(retry_sleep);
5119 continue;
5120 }
5121 DBUG_RETURN(-1);
5122 }
5123 break;
5124 }
5125
5126 /* ndb_share reference binlog */
5127 get_share(share);
5128 DBUG_PRINT("NDB_SHARE", ("%s binlog use_count: %u",
5129 share->key_string(), share->use_count));
5130 if (do_ndb_apply_status_share)
5131 {
5132 /* ndb_share reference binlog extra */
5133 ndb_apply_status_share= get_share(share);
5134 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
5135 share->key_string(), share->use_count));
5136 (void) native_cond_signal(&injector_cond);
5137 }
5138 else if (do_ndb_schema_share)
5139 {
5140 /* ndb_share reference binlog extra */
5141 ndb_schema_share= get_share(share);
5142 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
5143 share->key_string(), share->use_count));
5144 (void) native_cond_signal(&injector_cond);
5145 }
5146
5147 DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
5148 share->key_string(), (long) share->op,
5149 share->use_count));
5150
5151 if (opt_ndb_extra_logging)
5152 sql_print_information("NDB Binlog: logging %s (%s,%s)",
5153 share->key_string(),
5154 get_binlog_full(share) ? "FULL" : "UPDATED",
5155 get_binlog_use_update(share) ? "USE_UPDATE" : "USE_WRITE");
5156 DBUG_RETURN(0);
5157 }
5158
5159 int
ndbcluster_drop_event(THD * thd,Ndb * ndb,NDB_SHARE * share,const char * dbname,const char * tabname)5160 ndbcluster_drop_event(THD *thd, Ndb *ndb, NDB_SHARE *share,
5161 const char *dbname,
5162 const char *tabname)
5163 {
5164 DBUG_ENTER("ndbcluster_drop_event");
5165 /*
5166 There might be 2 types of events setup for the table, we cannot know
5167 which ones are supposed to be there as they may have been created
5168 differently for different mysqld's. So we drop both
5169 */
5170 for (uint i= 0; i < 2; i++)
5171 {
5172 NDBDICT *dict= ndb->getDictionary();
5173 String event_name(INJECTOR_EVENT_LEN);
5174 ndb_rep_event_name(&event_name, dbname, tabname, i);
5175
5176 if (!dict->dropEvent(event_name.c_ptr()))
5177 continue;
5178
5179 if (dict->getNdbError().code != 4710 &&
5180 dict->getNdbError().code != 1419)
5181 {
5182 /* drop event failed for some reason, issue a warning */
5183 push_warning_printf(thd, Sql_condition::SL_WARNING,
5184 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5185 dict->getNdbError().code,
5186 dict->getNdbError().message, "NDB");
5187 /* error is not that the event did not exist */
5188 sql_print_error("NDB Binlog: Unable to drop event in database. "
5189 "Event: %s Error Code: %d Message: %s",
5190 event_name.c_ptr(),
5191 dict->getNdbError().code,
5192 dict->getNdbError().message);
5193 /* ToDo; handle error? */
5194 if (share && share->op &&
5195 share->op->getState() == NdbEventOperation::EO_EXECUTING &&
5196 dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
5197 {
5198 assert(FALSE);
5199 DBUG_RETURN(-1);
5200 }
5201 }
5202 }
5203 DBUG_RETURN(0);
5204 }
5205
5206 /*
5207 when entering the calling thread should have a share lock id share != 0
5208 then the injector thread will have one as well, i.e. share->use_count == 0
5209 (unless it has already dropped... then share->op == 0)
5210 */
5211
5212 int
ndbcluster_handle_drop_table(THD * thd,Ndb * ndb,NDB_SHARE * share,const char * type_str,const char * dbname,const char * tabname)5213 ndbcluster_handle_drop_table(THD *thd, Ndb *ndb, NDB_SHARE *share,
5214 const char *type_str,
5215 const char * dbname, const char * tabname)
5216 {
5217 DBUG_ENTER("ndbcluster_handle_drop_table");
5218
5219 if (dbname && tabname)
5220 {
5221 if (ndbcluster_drop_event(thd, ndb, share, dbname, tabname))
5222 DBUG_RETURN(-1);
5223 }
5224
5225 if (share == 0 || share->op == 0)
5226 {
5227 DBUG_RETURN(0);
5228 }
5229
5230 /*
5231 Syncronized drop between client thread and injector thread is
5232 neccessary in order to maintain ordering in the binlog,
5233 such that the drop occurs _after_ any inserts/updates/deletes.
5234
5235 The penalty for this is that the drop table becomes slow.
5236
5237 This wait is however not strictly neccessary to produce a binlog
5238 that is usable. However the slave does not currently handle
5239 these out of order, thus we are keeping the SYNC_DROP_ defined
5240 for now.
5241 */
5242 const char *save_proc_info= thd->proc_info;
5243 #define SYNC_DROP_
5244 #ifdef SYNC_DROP_
5245 thd->proc_info= "Syncing ndb table schema operation and binlog";
5246 native_mutex_lock(&share->mutex);
5247 int max_timeout= DEFAULT_SYNC_TIMEOUT;
5248 while (share->op)
5249 {
5250 struct timespec abstime;
5251 set_timespec(&abstime, 1);
5252
5253 // Unlock the share and wait for injector to signal that
5254 // something has happened. (NOTE! convoluted in order to
5255 // only use injector_cond with injector_mutex)
5256 native_mutex_unlock(&share->mutex);
5257 native_mutex_lock(&injector_mutex);
5258 int ret= native_cond_timedwait(&injector_cond,
5259 &injector_mutex,
5260 &abstime);
5261 native_mutex_unlock(&injector_mutex);
5262 native_mutex_lock(&share->mutex);
5263
5264 if (thd->killed ||
5265 share->op == 0)
5266 break;
5267 if (ret)
5268 {
5269 max_timeout--;
5270 if (max_timeout == 0)
5271 {
5272 sql_print_error("NDB %s: %s timed out. Ignoring...",
5273 type_str, share->key_string());
5274 assert(false);
5275 break;
5276 }
5277 if (opt_ndb_extra_logging)
5278 ndb_report_waiting(type_str, max_timeout,
5279 type_str, share->key_string(), 0);
5280 }
5281 }
5282 native_mutex_unlock(&share->mutex);
5283 #else
5284 native_mutex_lock(&share->mutex);
5285 share->op= 0;
5286 native_mutex_unlock(&share->mutex);
5287 #endif
5288 thd->proc_info= save_proc_info;
5289
5290 DBUG_RETURN(0);
5291 }
5292
5293
5294 /********************************************************************
5295 Internal helper functions for differentd events from the stoarage nodes
5296 used by the ndb injector thread
5297 ********************************************************************/
5298
5299 /*
5300 Unpack a record read from NDB
5301
5302 SYNOPSIS
5303 ndb_unpack_record()
5304 buf Buffer to store read row
5305
5306 NOTE
5307 The data for each row is read directly into the
5308 destination buffer. This function is primarily
5309 called in order to check if any fields should be
5310 set to null.
5311 */
5312
ndb_unpack_record(TABLE * table,NdbValue * value,MY_BITMAP * defined,uchar * buf)5313 static void ndb_unpack_record(TABLE *table, NdbValue *value,
5314 MY_BITMAP *defined, uchar *buf)
5315 {
5316 Field **p_field= table->field, *field= *p_field;
5317 my_ptrdiff_t row_offset= (my_ptrdiff_t) (buf - table->record[0]);
5318 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
5319 DBUG_ENTER("ndb_unpack_record");
5320
5321 /*
5322 Set the filler bits of the null byte, since they are
5323 not touched in the code below.
5324
5325 The filler bits are the MSBs in the last null byte
5326 */
5327 if (table->s->null_bytes > 0)
5328 buf[table->s->null_bytes - 1]|= 256U - (1U <<
5329 table->s->last_null_bit_pos);
5330 /*
5331 Set null flag(s)
5332 */
5333 for ( ; field;
5334 p_field++, value++, field= *p_field)
5335 {
5336 field->set_notnull(row_offset);
5337 if ((*value).ptr)
5338 {
5339 if (!(field->flags & BLOB_FLAG))
5340 {
5341 int is_null= (*value).rec->isNULL();
5342 if (is_null)
5343 {
5344 if (is_null > 0)
5345 {
5346 DBUG_PRINT("info",("[%u] NULL", field->field_index));
5347 field->set_null(row_offset);
5348 }
5349 else
5350 {
5351 DBUG_PRINT("info",("[%u] UNDEFINED", field->field_index));
5352 bitmap_clear_bit(defined, field->field_index);
5353 }
5354 }
5355 else if (field->type() == MYSQL_TYPE_BIT)
5356 {
5357 Field_bit *field_bit= static_cast<Field_bit*>(field);
5358
5359 /*
5360 Move internal field pointer to point to 'buf'. Calling
5361 the correct member function directly since we know the
5362 type of the object.
5363 */
5364 field_bit->Field_bit::move_field_offset(row_offset);
5365 if (field->pack_length() < 5)
5366 {
5367 DBUG_PRINT("info", ("bit field H'%.8X",
5368 (*value).rec->u_32_value()));
5369 field_bit->Field_bit::store((longlong) (*value).rec->u_32_value(),
5370 TRUE);
5371 }
5372 else
5373 {
5374 DBUG_PRINT("info", ("bit field H'%.8X%.8X",
5375 *(Uint32 *)(*value).rec->aRef(),
5376 *((Uint32 *)(*value).rec->aRef()+1)));
5377 #ifdef WORDS_BIGENDIAN
5378 /* lsw is stored first */
5379 Uint32 *buf= (Uint32 *)(*value).rec->aRef();
5380 field_bit->Field_bit::store((((longlong)*buf)
5381 & 0x00000000FFFFFFFFLL)
5382 |
5383 ((((longlong)*(buf+1)) << 32)
5384 & 0xFFFFFFFF00000000LL),
5385 TRUE);
5386 #else
5387 field_bit->Field_bit::store((longlong)
5388 (*value).rec->u_64_value(), TRUE);
5389 #endif
5390 }
5391 /*
5392 Move back internal field pointer to point to original
5393 value (usually record[0]).
5394 */
5395 field_bit->Field_bit::move_field_offset(-row_offset);
5396 DBUG_PRINT("info",("[%u] SET",
5397 (*value).rec->getColumn()->getColumnNo()));
5398 DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5399 }
5400 else
5401 {
5402 DBUG_PRINT("info",("[%u] SET",
5403 (*value).rec->getColumn()->getColumnNo()));
5404 DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5405 }
5406 }
5407 else
5408 {
5409 NdbBlob *ndb_blob= (*value).blob;
5410 uint col_no= field->field_index;
5411 int isNull;
5412 ndb_blob->getDefined(isNull);
5413 if (isNull == 1)
5414 {
5415 DBUG_PRINT("info",("[%u] NULL", col_no));
5416 field->set_null(row_offset);
5417 }
5418 else if (isNull == -1)
5419 {
5420 DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
5421 bitmap_clear_bit(defined, col_no);
5422 }
5423 else
5424 {
5425 #ifndef NDEBUG
5426 // pointer vas set in get_ndb_blobs_value
5427 Field_blob *field_blob= (Field_blob*)field;
5428 uchar* ptr;
5429 field_blob->get_ptr(&ptr, row_offset);
5430 uint32 len= field_blob->get_length(row_offset);
5431 DBUG_PRINT("info",("[%u] SET ptr: 0x%lx len: %u",
5432 col_no, (long) ptr, len));
5433 #endif
5434 }
5435 }
5436 }
5437 }
5438 dbug_tmp_restore_column_map(table->write_set, old_map);
5439 DBUG_VOID_RETURN;
5440 }
5441
5442 /*
5443 Handle error states on events from the storage nodes
5444 */
5445 static int
handle_error(NdbEventOperation * pOp)5446 handle_error(NdbEventOperation *pOp)
5447 {
5448 Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5449 NDB_SHARE *share= event_data->share;
5450 DBUG_ENTER("handle_error");
5451
5452 sql_print_error("NDB Binlog: unhandled error %d for table %s",
5453 pOp->hasError(), share->key_string());
5454 pOp->clearError();
5455 DBUG_RETURN(0);
5456 }
5457
5458
5459 /*
5460 Handle _non_ data events from the storage nodes
5461 */
5462
5463 static
5464 void
handle_non_data_event(THD * thd,NdbEventOperation * pOp,ndb_binlog_index_row & row)5465 handle_non_data_event(THD *thd,
5466 NdbEventOperation *pOp,
5467 ndb_binlog_index_row &row)
5468 {
5469 const Ndb_event_data* event_data=
5470 static_cast<const Ndb_event_data*>(pOp->getCustomData());
5471 NDB_SHARE *share= event_data->share;
5472 const NDBEVENT::TableEvent type= pOp->getEventType();
5473
5474 DBUG_ENTER("handle_non_data_event");
5475 DBUG_PRINT("enter", ("pOp: %p, event_data: %p, share: %p",
5476 pOp, event_data, share));
5477 DBUG_PRINT("enter", ("type: %d", type));
5478
5479 if (type == NDBEVENT::TE_DROP ||
5480 type == NDBEVENT::TE_ALTER)
5481 {
5482 // Count schema events
5483 row.n_schemaops++;
5484 }
5485
5486 switch (type)
5487 {
5488 case NDBEVENT::TE_CLUSTER_FAILURE:
5489 if (opt_ndb_extra_logging)
5490 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
5491 share->key_string(),
5492 (uint)(pOp->getGCI() >> 32),
5493 (uint)(pOp->getGCI()));
5494 // fallthrough
5495 case NDBEVENT::TE_DROP:
5496 if (ndb_apply_status_share == share)
5497 {
5498 if (opt_ndb_extra_logging &&
5499 ndb_binlog_tables_inited && ndb_binlog_running)
5500 sql_print_information("NDB Binlog: ndb tables initially "
5501 "read only on reconnect.");
5502
5503 /* release the ndb_apply_status_share */
5504 free_share(&ndb_apply_status_share);
5505 ndb_apply_status_share= 0;
5506 ndb_binlog_tables_inited= FALSE;
5507 }
5508
5509 ndb_handle_schema_change(thd, injector_ndb, pOp, event_data);
5510 break;
5511
5512 case NDBEVENT::TE_ALTER:
5513 DBUG_PRINT("info", ("TE_ALTER"));
5514 break;
5515
5516 case NDBEVENT::TE_NODE_FAILURE:
5517 case NDBEVENT::TE_SUBSCRIBE:
5518 case NDBEVENT::TE_UNSUBSCRIBE:
5519 /* ignore */
5520 break;
5521
5522 default:
5523 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
5524 "Ignoring...", (unsigned) type, share->key_string());
5525 break;
5526 }
5527
5528 DBUG_VOID_RETURN;
5529 }
5530
5531 /*
5532 Handle data events from the storage nodes
5533 */
5534 inline ndb_binlog_index_row *
ndb_find_binlog_index_row(ndb_binlog_index_row ** rows,uint orig_server_id,int flag)5535 ndb_find_binlog_index_row(ndb_binlog_index_row **rows,
5536 uint orig_server_id, int flag)
5537 {
5538 ndb_binlog_index_row *row= *rows;
5539 if (opt_ndb_log_orig)
5540 {
5541 ndb_binlog_index_row *first= row, *found_id= 0;
5542 for (;;)
5543 {
5544 if (row->orig_server_id == orig_server_id)
5545 {
5546 /* */
5547 if (!flag || !row->orig_epoch)
5548 return row;
5549 if (!found_id)
5550 found_id= row;
5551 }
5552 if (row->orig_server_id == 0)
5553 break;
5554 row= row->next;
5555 if (row == NULL)
5556 {
5557 row= (ndb_binlog_index_row*)sql_alloc(sizeof(ndb_binlog_index_row));
5558 memset(row, 0, sizeof(ndb_binlog_index_row));
5559 row->next= first;
5560 *rows= row;
5561 if (found_id)
5562 {
5563 /*
5564 If we found index_row with same server id already
5565 that row will contain the current stats.
5566 Copy stats over to new and reset old.
5567 */
5568 row->n_inserts= found_id->n_inserts;
5569 row->n_updates= found_id->n_updates;
5570 row->n_deletes= found_id->n_deletes;
5571 found_id->n_inserts= 0;
5572 found_id->n_updates= 0;
5573 found_id->n_deletes= 0;
5574 }
5575 /* keep track of schema ops only on "first" index_row */
5576 row->n_schemaops= first->n_schemaops;
5577 first->n_schemaops= 0;
5578 break;
5579 }
5580 }
5581 row->orig_server_id= orig_server_id;
5582 }
5583 return row;
5584 }
5585
5586
5587 static int
handle_data_event(THD * thd,Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row ** rows,injector::transaction & trans,unsigned & trans_row_count,unsigned & trans_slave_row_count)5588 handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
5589 ndb_binlog_index_row **rows,
5590 injector::transaction &trans,
5591 unsigned &trans_row_count,
5592 unsigned &trans_slave_row_count)
5593 {
5594 Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5595 TABLE *table= event_data->shadow_table;
5596 NDB_SHARE *share= event_data->share;
5597 bool reflected_op = false;
5598 bool refresh_op = false;
5599 bool read_op = false;
5600
5601 if (pOp != share->op)
5602 {
5603 return 0;
5604 }
5605
5606 uint32 anyValue= pOp->getAnyValue();
5607 if (ndbcluster_anyvalue_is_reserved(anyValue))
5608 {
5609 if (ndbcluster_anyvalue_is_nologging(anyValue))
5610 return 0;
5611
5612 if (ndbcluster_anyvalue_is_reflect_op(anyValue))
5613 {
5614 DBUG_PRINT("info", ("Anyvalue -> Reflect (%u)", anyValue));
5615 reflected_op = true;
5616 anyValue = 0;
5617 }
5618 else if (ndbcluster_anyvalue_is_refresh_op(anyValue))
5619 {
5620 DBUG_PRINT("info", ("Anyvalue -> Refresh"));
5621 refresh_op = true;
5622 anyValue = 0;
5623 }
5624 else if (ndbcluster_anyvalue_is_read_op(anyValue))
5625 {
5626 DBUG_PRINT("info", ("Anyvalue -> Read"));
5627 read_op = true;
5628 anyValue = 0;
5629 }
5630 else
5631 {
5632 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
5633 "event not logged",
5634 anyValue);
5635 return 0;
5636 }
5637 }
5638
5639 uint32 originating_server_id= ndbcluster_anyvalue_get_serverid(anyValue);
5640 bool log_this_slave_update = g_ndb_log_slave_updates;
5641 bool count_this_event = true;
5642
5643 if (share == ndb_apply_status_share)
5644 {
5645 /*
5646 Note that option values are read without synchronisation w.r.t.
5647 thread setting option variable or epoch boundaries.
5648 */
5649 if (opt_ndb_log_apply_status ||
5650 opt_ndb_log_orig)
5651 {
5652 Uint32 ndb_apply_status_logging_server_id= originating_server_id;
5653 Uint32 ndb_apply_status_server_id= 0;
5654 Uint64 ndb_apply_status_epoch= 0;
5655 bool event_has_data = false;
5656
5657 switch(pOp->getEventType())
5658 {
5659 case NDBEVENT::TE_INSERT:
5660 case NDBEVENT::TE_UPDATE:
5661 event_has_data = true;
5662 break;
5663
5664 case NDBEVENT::TE_DELETE:
5665 break;
5666 default:
5667 /* We should REALLY never get here */
5668 abort();
5669 }
5670
5671 if (likely( event_has_data ))
5672 {
5673 /* unpack data to fetch orig_server_id and orig_epoch */
5674 uint n_fields= table->s->fields;
5675 MY_BITMAP b;
5676 uint32 bitbuf[128 / (sizeof(uint32) * 8)];
5677 bitmap_init(&b, bitbuf, n_fields, FALSE);
5678 bitmap_set_all(&b);
5679 ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
5680 ndb_apply_status_server_id= (uint)((Field_long *)table->field[0])->val_int();
5681 ndb_apply_status_epoch= ((Field_longlong *)table->field[1])->val_int();
5682
5683 if (opt_ndb_log_apply_status)
5684 {
5685 /*
5686 Determine if event came from our immediate Master server
5687 Ignore locally manually sourced and reserved events
5688 */
5689 if ((ndb_apply_status_logging_server_id != 0) &&
5690 (! ndbcluster_anyvalue_is_reserved(ndb_apply_status_logging_server_id)))
5691 {
5692 bool isFromImmediateMaster = (ndb_apply_status_server_id ==
5693 ndb_apply_status_logging_server_id);
5694
5695 if (isFromImmediateMaster)
5696 {
5697 /*
5698 We log this event with our server-id so that it
5699 propagates back to the originating Master (our
5700 immediate Master)
5701 */
5702 assert(ndb_apply_status_logging_server_id != ::server_id);
5703
5704 originating_server_id= 0; /* Will be set to our ::serverid below */
5705 }
5706 }
5707 }
5708
5709 if (opt_ndb_log_orig)
5710 {
5711 /* store */
5712 ndb_binlog_index_row *row= ndb_find_binlog_index_row
5713 (rows, ndb_apply_status_server_id, 1);
5714 row->orig_epoch= ndb_apply_status_epoch;
5715 }
5716 }
5717 } // opt_ndb_log_apply_status || opt_ndb_log_orig)
5718
5719 if (opt_ndb_log_apply_status)
5720 {
5721 /* We are logging ndb_apply_status changes
5722 * Don't count this event as making an epoch non-empty
5723 * Log this event in the Binlog
5724 */
5725 count_this_event = false;
5726 log_this_slave_update = true;
5727 }
5728 else
5729 {
5730 /* Not logging ndb_apply_status updates, discard this event now */
5731 return 0;
5732 }
5733 }
5734
5735 if (originating_server_id == 0)
5736 originating_server_id= ::server_id;
5737 else
5738 {
5739 assert(!reflected_op && !refresh_op);
5740 /* Track that we received a replicated row event */
5741 if (likely( count_this_event ))
5742 trans_slave_row_count++;
5743
5744 if (!log_this_slave_update)
5745 {
5746 /*
5747 This event comes from a slave applier since it has an originating
5748 server id set. Since option to log slave updates is not set, skip it.
5749 */
5750 return 0;
5751 }
5752 }
5753
5754 /*
5755 Start with logged_server_id as AnyValue in case it's a composite
5756 (server_id_bits < 31). This way any user-values are passed-through
5757 to the Binlog in the high bits of the event's Server Id.
5758 In future it may be useful to support *not* mapping composite
5759 AnyValues to/from Binlogged server-ids.
5760 */
5761 uint32 logged_server_id= anyValue;
5762 ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
5763
5764 /*
5765 Get NdbApi transaction id for this event to put into Binlog
5766 */
5767 Ndb_binlog_extra_row_info extra_row_info;
5768 const uchar* extra_row_info_ptr = NULL;
5769 Uint16 erif_flags = 0;
5770 if (opt_ndb_log_transaction_id)
5771 {
5772 erif_flags |= Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID;
5773 extra_row_info.setTransactionId(pOp->getTransId());
5774 }
5775
5776 /* Set conflict flags member if necessary */
5777 Uint16 event_conflict_flags = 0;
5778 assert(! (reflected_op && refresh_op));
5779 if (reflected_op)
5780 {
5781 event_conflict_flags |= NDB_ERIF_CFT_REFLECT_OP;
5782 }
5783 else if (refresh_op)
5784 {
5785 event_conflict_flags |= NDB_ERIF_CFT_REFRESH_OP;
5786 }
5787 else if (read_op)
5788 {
5789 event_conflict_flags |= NDB_ERIF_CFT_READ_OP;
5790 }
5791
5792 DBUG_EXECUTE_IF("ndb_injector_set_event_conflict_flags",
5793 {
5794 event_conflict_flags = 0xfafa;
5795 });
5796 if (event_conflict_flags != 0)
5797 {
5798 erif_flags |= Ndb_binlog_extra_row_info::NDB_ERIF_CFT_FLAGS;
5799 extra_row_info.setConflictFlags(event_conflict_flags);
5800 }
5801
5802 if (erif_flags != 0)
5803 {
5804 extra_row_info.setFlags(erif_flags);
5805 if (likely(!log_bin_use_v1_row_events))
5806 {
5807 extra_row_info_ptr = extra_row_info.generateBuffer();
5808 }
5809 else
5810 {
5811 /**
5812 * Can't put the metadata in a v1 event
5813 * Produce 1 warning at most
5814 */
5815 if (!g_injector_v1_warning_emitted)
5816 {
5817 sql_print_error("NDB: Binlog Injector discarding row event "
5818 "meta data as server is using v1 row events. "
5819 "(%u %x)",
5820 opt_ndb_log_transaction_id,
5821 event_conflict_flags);
5822
5823 g_injector_v1_warning_emitted = true;
5824 }
5825 }
5826 }
5827
5828 assert(trans.good());
5829 assert(table != 0);
5830
5831 dbug_print_table("table", table);
5832
5833 uint n_fields= table->s->fields;
5834 DBUG_PRINT("info", ("Assuming %u columns for table %s",
5835 n_fields, table->s->table_name.str));
5836 MY_BITMAP b;
5837 my_bitmap_map bitbuf[(NDB_MAX_ATTRIBUTES_IN_TABLE +
5838 8*sizeof(my_bitmap_map) - 1) /
5839 (8*sizeof(my_bitmap_map))];
5840 bitmap_init(&b, bitbuf, n_fields, FALSE);
5841 bitmap_set_all(&b);
5842
5843 /*
5844 row data is already in table->record[0]
5845 As we told the NdbEventOperation to do this
5846 (saves moving data about many times)
5847 */
5848
5849 /*
5850 for now malloc/free blobs buffer each time
5851 TODO if possible share single permanent buffer with handlers
5852 */
5853 uchar* blobs_buffer[2] = { 0, 0 };
5854 uint blobs_buffer_size[2] = { 0, 0 };
5855
5856 ndb_binlog_index_row *row=
5857 ndb_find_binlog_index_row(rows, originating_server_id, 0);
5858
5859 switch(pOp->getEventType())
5860 {
5861 case NDBEVENT::TE_INSERT:
5862 if (likely( count_this_event ))
5863 {
5864 row->n_inserts++;
5865 trans_row_count++;
5866 }
5867 DBUG_PRINT("info", ("INSERT INTO %s.%s",
5868 table->s->db.str, table->s->table_name.str));
5869 {
5870 int ret;
5871 if (share->flags & NSF_BLOB_FLAG)
5872 {
5873 my_ptrdiff_t ptrdiff= 0;
5874 ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
5875 blobs_buffer[0],
5876 blobs_buffer_size[0],
5877 ptrdiff);
5878 assert(ret == 0);
5879 }
5880 ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
5881 ret = trans.write_row(logged_server_id,
5882 injector::transaction::table(table, true),
5883 &b, n_fields, table->record[0],
5884 extra_row_info_ptr);
5885 assert(ret == 0);
5886 }
5887 break;
5888 case NDBEVENT::TE_DELETE:
5889 if (likely( count_this_event ))
5890 {
5891 row->n_deletes++;
5892 trans_row_count++;
5893 }
5894 DBUG_PRINT("info",("DELETE FROM %s.%s",
5895 table->s->db.str, table->s->table_name.str));
5896 {
5897 /*
5898 table->record[0] contains only the primary key in this case
5899 since we do not have an after image
5900 */
5901 int n;
5902 if (!get_binlog_full(share) && table->s->primary_key != MAX_KEY)
5903 n= 0; /*
5904 use the primary key only as it save time and space and
5905 it is the only thing needed to log the delete
5906 */
5907 else
5908 n= 1; /*
5909 we use the before values since we don't have a primary key
5910 since the mysql server does not handle the hidden primary
5911 key
5912 */
5913
5914 int ret;
5915 if (share->flags & NSF_BLOB_FLAG)
5916 {
5917 my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
5918 ret = get_ndb_blobs_value(table, event_data->ndb_value[n],
5919 blobs_buffer[n],
5920 blobs_buffer_size[n],
5921 ptrdiff);
5922 assert(ret == 0);
5923 }
5924 ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]);
5925 DBUG_EXECUTE("info", print_records(table, table->record[n]););
5926 ret = trans.delete_row(logged_server_id,
5927 injector::transaction::table(table, true),
5928 &b, n_fields, table->record[n],
5929 extra_row_info_ptr);
5930 assert(ret == 0);
5931 }
5932 break;
5933 case NDBEVENT::TE_UPDATE:
5934 if (likely( count_this_event ))
5935 {
5936 row->n_updates++;
5937 trans_row_count++;
5938 }
5939 DBUG_PRINT("info", ("UPDATE %s.%s",
5940 table->s->db.str, table->s->table_name.str));
5941 {
5942 int ret;
5943 if (share->flags & NSF_BLOB_FLAG)
5944 {
5945 my_ptrdiff_t ptrdiff= 0;
5946 ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
5947 blobs_buffer[0],
5948 blobs_buffer_size[0],
5949 ptrdiff);
5950 assert(ret == 0);
5951 }
5952 ndb_unpack_record(table, event_data->ndb_value[0],
5953 &b, table->record[0]);
5954 DBUG_EXECUTE("info", print_records(table, table->record[0]););
5955 if (table->s->primary_key != MAX_KEY &&
5956 !get_binlog_use_update(share))
5957 {
5958 /*
5959 since table has a primary key, we can do a write
5960 using only after values
5961 */
5962 ret = trans.write_row(logged_server_id,
5963 injector::transaction::table(table, true),
5964 &b, n_fields, table->record[0],// after values
5965 extra_row_info_ptr);
5966 assert(ret == 0);
5967 }
5968 else
5969 {
5970 /*
5971 mysql server cannot handle the ndb hidden key and
5972 therefore needs the before image as well
5973 */
5974 if (share->flags & NSF_BLOB_FLAG)
5975 {
5976 my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
5977 ret = get_ndb_blobs_value(table, event_data->ndb_value[1],
5978 blobs_buffer[1],
5979 blobs_buffer_size[1],
5980 ptrdiff);
5981 assert(ret == 0);
5982 }
5983 ndb_unpack_record(table, event_data->ndb_value[1], &b, table->record[1]);
5984 DBUG_EXECUTE("info", print_records(table, table->record[1]););
5985
5986 MY_BITMAP col_bitmap_before_update;
5987 my_bitmap_map bitbuf[(NDB_MAX_ATTRIBUTES_IN_TABLE +
5988 8*sizeof(my_bitmap_map) - 1) /
5989 (8*sizeof(my_bitmap_map))];
5990 bitmap_init(&col_bitmap_before_update, bitbuf, n_fields, FALSE);
5991 if (get_binlog_update_minimal(share))
5992 {
5993 event_data->generate_minimal_bitmap(&col_bitmap_before_update, &b);
5994 }
5995 else
5996 {
5997 bitmap_copy(&col_bitmap_before_update, &b);
5998 }
5999
6000 ret = trans.update_row(logged_server_id,
6001 injector::transaction::table(table, true),
6002 &col_bitmap_before_update, &b, n_fields,
6003 table->record[1], // before values
6004 table->record[0], // after values
6005 extra_row_info_ptr);
6006 assert(ret == 0);
6007 }
6008 }
6009 break;
6010 default:
6011 /* We should REALLY never get here. */
6012 DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
6013 break;
6014 }
6015
6016 if (share->flags & NSF_BLOB_FLAG)
6017 {
6018 my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
6019 my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
6020 }
6021
6022 return 0;
6023 }
6024
6025
6026 /****************************************************************
6027 Injector thread main loop
6028 ****************************************************************/
6029
6030 static void
remove_event_operations(Ndb * ndb)6031 remove_event_operations(Ndb* ndb)
6032 {
6033 DBUG_ENTER("remove_event_operations");
6034 NdbEventOperation *op;
6035 while ((op= ndb->getEventOperation()))
6036 {
6037 assert(!IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
6038 DBUG_PRINT("info", ("removing event operation on %s",
6039 op->getEvent()->getName()));
6040
6041 Ndb_event_data *event_data= (Ndb_event_data *) op->getCustomData();
6042 assert(event_data);
6043
6044 NDB_SHARE *share= event_data->share;
6045 assert(share != NULL);
6046 assert(share->op == op || share->new_op == op);
6047
6048 delete event_data;
6049 op->setCustomData(NULL);
6050
6051 native_mutex_lock(&share->mutex);
6052 share->op= 0;
6053 share->new_op= 0;
6054 native_mutex_unlock(&share->mutex);
6055
6056 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
6057 share->key_string(), share->use_count));
6058 free_share(&share);
6059
6060 ndb->dropEventOperation(op);
6061 }
6062 DBUG_VOID_RETURN;
6063 }
6064
6065 extern long long g_event_data_count;
6066 extern long long g_event_nondata_count;
6067 extern long long g_event_bytes_count;
6068
updateInjectorStats(Ndb * schemaNdb,Ndb * dataNdb)6069 void updateInjectorStats(Ndb* schemaNdb, Ndb* dataNdb)
6070 {
6071 /* Update globals to sum of totals from each listening
6072 * Ndb object
6073 */
6074 g_event_data_count =
6075 schemaNdb->getClientStat(Ndb::DataEventsRecvdCount) +
6076 dataNdb->getClientStat(Ndb::DataEventsRecvdCount);
6077 g_event_nondata_count =
6078 schemaNdb->getClientStat(Ndb::NonDataEventsRecvdCount) +
6079 dataNdb->getClientStat(Ndb::NonDataEventsRecvdCount);
6080 g_event_bytes_count =
6081 schemaNdb->getClientStat(Ndb::EventBytesRecvdCount) +
6082 dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
6083 }
6084
6085 /**
6086 injectApplyStatusWriteRow
6087
6088 Inject a WRITE_ROW event on the ndb_apply_status table into
6089 the Binlog.
6090 This contains our server_id and the supplied epoch number.
6091 When applied on the Slave it gives a transactional position
6092 marker
6093 */
6094 static
6095 bool
injectApplyStatusWriteRow(injector::transaction & trans,ulonglong gci)6096 injectApplyStatusWriteRow(injector::transaction& trans,
6097 ulonglong gci)
6098 {
6099 DBUG_ENTER("injectApplyStatusWriteRow");
6100 if (ndb_apply_status_share == NULL)
6101 {
6102 sql_print_error("NDB: Could not get apply status share");
6103 assert(ndb_apply_status_share != NULL);
6104 DBUG_RETURN(false);
6105 }
6106
6107 longlong gci_to_store = (longlong) gci;
6108
6109 #ifndef NDEBUG
6110 DBUG_EXECUTE_IF("ndb_binlog_injector_cycle_gcis",
6111 {
6112 ulonglong gciHi = ((gci_to_store >> 32)
6113 & 0xffffffff);
6114 ulonglong gciLo = (gci_to_store & 0xffffffff);
6115 gciHi = (gciHi % 3);
6116 sql_print_warning("NDB Binlog injector cycling gcis (%llu -> %llu)",
6117 gci_to_store, (gciHi << 32) + gciLo);
6118 gci_to_store = (gciHi << 32) + gciLo;
6119 });
6120 DBUG_EXECUTE_IF("ndb_binlog_injector_repeat_gcis",
6121 {
6122 ulonglong gciHi = ((gci_to_store >> 32)
6123 & 0xffffffff);
6124 ulonglong gciLo = (gci_to_store & 0xffffffff);
6125 gciHi=0xffffff00;
6126 gciLo=0;
6127 sql_print_warning("NDB Binlog injector repeating gcis (%llu -> %llu)",
6128 gci_to_store, (gciHi << 32) + gciLo);
6129 gci_to_store = (gciHi << 32) + gciLo;
6130 });
6131 #endif
6132
6133 /* Build row buffer for generated ndb_apply_status
6134 WRITE_ROW event
6135 First get the relevant table structure.
6136 */
6137 assert(!ndb_apply_status_share->event_data);
6138 assert(ndb_apply_status_share->op);
6139 Ndb_event_data* event_data=
6140 (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
6141 assert(event_data);
6142 assert(event_data->shadow_table);
6143 TABLE* apply_status_table= event_data->shadow_table;
6144
6145 /*
6146 Intialize apply_status_table->record[0]
6147
6148 When iterating past the end of the last epoch, the first event of
6149 the new epoch may be on ndb_apply_status. Its event data saved
6150 in record[0] would be overwritten here by a subsequent event on a
6151 normal table. So save and restore its record[0].
6152 */
6153 static const ulong sav_max= 512; // current is 284
6154 const ulong sav_len= apply_status_table->s->reclength;
6155 assert(sav_len <= sav_max);
6156 uchar sav_buf[sav_max];
6157 memcpy(sav_buf, apply_status_table->record[0], sav_len);
6158 empty_record(apply_status_table);
6159
6160 apply_status_table->field[0]->store((longlong)::server_id, true);
6161 apply_status_table->field[1]->store((longlong)gci_to_store, true);
6162 apply_status_table->field[2]->store("", 0, &my_charset_bin);
6163 apply_status_table->field[3]->store((longlong)0, true);
6164 apply_status_table->field[4]->store((longlong)0, true);
6165 #ifndef NDEBUG
6166 const LEX_STRING &name= apply_status_table->s->table_name;
6167 DBUG_PRINT("info", ("use_table: %.*s",
6168 (int) name.length, name.str));
6169 #endif
6170 injector::transaction::table tbl(apply_status_table, true);
6171 int ret = trans.use_table(::server_id, tbl);
6172 assert(ret == 0); NDB_IGNORE_VALUE(ret);
6173
6174 ret= trans.write_row(::server_id,
6175 injector::transaction::table(apply_status_table,
6176 true),
6177 &apply_status_table->s->all_set,
6178 apply_status_table->s->fields,
6179 apply_status_table->record[0]);
6180
6181 assert(ret == 0);
6182
6183 memcpy(apply_status_table->record[0], sav_buf, sav_len);
6184 DBUG_RETURN(true);
6185 }
6186
6187
6188 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
6189 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
6190 extern ulong opt_ndb_eventbuffer_max_alloc;
6191 extern uint opt_ndb_eventbuffer_free_percent;
6192
Ndb_binlog_thread()6193 Ndb_binlog_thread::Ndb_binlog_thread()
6194 : Ndb_component("Binlog")
6195 {
6196 }
6197
6198
~Ndb_binlog_thread()6199 Ndb_binlog_thread::~Ndb_binlog_thread()
6200 {
6201 }
6202
6203
do_wakeup()6204 void Ndb_binlog_thread::do_wakeup()
6205 {
6206 log_info("Wakeup");
6207
6208 /*
6209 The binlog thread is normally waiting for another
6210 event from the cluster with short timeout and should
6211 soon(within 1 second) detect that stop has been requested.
6212
6213 There are really no purpose(yet) to signal some condition
6214 trying to wake the thread up should it be waiting somewhere
6215 else since those waits are also short.
6216 */
6217 }
6218
6219
6220 void
do_run()6221 Ndb_binlog_thread::do_run()
6222 {
6223 THD *thd; /* needs to be first for thread_stack */
6224 Ndb *i_ndb= 0;
6225 Ndb *s_ndb= 0;
6226 Thd_ndb *thd_ndb=0;
6227 injector *inj= injector::instance();
6228 uint incident_id= 0;
6229 Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
6230
6231 enum { BCCC_running, BCCC_exit, BCCC_restart } binlog_thread_state;
6232
6233 /**
6234 * If we get error after having reported incident
6235 * but before binlog started...we do "Restarting Cluster Binlog"
6236 * in that case, don't report incident again
6237 */
6238 bool do_incident = true;
6239
6240 DBUG_ENTER("ndb_binlog_thread");
6241
6242 native_mutex_lock(&injector_mutex);
6243
6244 log_info("Starting...");
6245
6246 thd= new THD; /* note that contructor of THD uses DBUG_ */
6247 THD_CHECK_SENTRY(thd);
6248
6249 /* We need to set thd->thread_id before thd->store_globals, or it will
6250 set an invalid value for thd->variables.pseudo_thread_id.
6251 */
6252 thd->set_new_thread_id();
6253
6254 thd->thread_stack= (char*) &thd; /* remember where our stack is */
6255 if (thd->store_globals())
6256 {
6257 delete thd;
6258 native_mutex_unlock(&injector_mutex);
6259 native_cond_signal(&injector_cond);
6260 DBUG_VOID_RETURN;
6261 }
6262
6263 thd_set_command(thd, COM_DAEMON);
6264 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
6265 #ifndef NDB_THD_HAS_NO_VERSION
6266 thd->version= refresh_version;
6267 #endif
6268 thd->get_protocol_classic()->set_client_capabilities(0);
6269 thd->security_context()->skip_grants();
6270 // Create thd->net vithout vio
6271 thd->get_protocol_classic()->init_net((st_vio *) 0);
6272
6273 // Ndb binlog thread always use row format
6274 thd->set_current_stmt_binlog_format_row();
6275
6276 thd->real_id= my_thread_self();
6277 thd_manager->add_thd(thd);
6278 thd->lex->start_transaction_opt= 0;
6279
6280 log_info("Started");
6281
6282 Ndb_schema_dist_data schema_dist_data;
6283
6284 restart_cluster_failure:
6285 int have_injector_mutex_lock= 0;
6286 binlog_thread_state= BCCC_exit;
6287
6288 log_verbose(1, "Setting up");
6289
6290 if (!(thd_ndb= Thd_ndb::seize(thd)))
6291 {
6292 log_error("Creating Thd_ndb object failed");
6293 native_mutex_unlock(&injector_mutex);
6294 native_cond_signal(&injector_cond);
6295 goto err;
6296 }
6297
6298 if (!(s_ndb= new Ndb(g_ndb_cluster_connection, NDB_REP_DB)) ||
6299 s_ndb->setNdbObjectName("Ndb Binlog schema change monitoring") ||
6300 s_ndb->init())
6301 {
6302 log_error("Creating schema Ndb object failed");
6303 native_mutex_unlock(&injector_mutex);
6304 native_cond_signal(&injector_cond);
6305 goto err;
6306 }
6307 log_info("Created schema Ndb object, reference: 0x%x, name: '%s'",
6308 s_ndb->getReference(), s_ndb->getNdbObjectName());
6309
6310 // empty database
6311 if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
6312 i_ndb->setNdbObjectName("Ndb Binlog data change monitoring") ||
6313 i_ndb->init())
6314 {
6315 log_error("Creating injector Ndb object failed");
6316 native_mutex_unlock(&injector_mutex);
6317 native_cond_signal(&injector_cond);
6318 goto err;
6319 }
6320 log_info("Created injector Ndb object, reference: 0x%x, name: '%s'",
6321 i_ndb->getReference(), i_ndb->getNdbObjectName());
6322
6323 /* Set free percent event buffer needed to resume buffering */
6324 if (i_ndb->set_eventbuffer_free_percent(opt_ndb_eventbuffer_free_percent))
6325 {
6326 log_error("Setting ventbuffer free percent failed");
6327 native_mutex_unlock(&injector_mutex);
6328 native_cond_signal(&injector_cond);
6329 goto err;
6330 }
6331
6332 log_verbose(10, "Exposing global references");
6333 /*
6334 Expose global reference to our ndb object.
6335
6336 Used by both sql client thread and binlog thread to interact
6337 with the storage
6338 native_mutex_lock(&injector_mutex);
6339 */
6340 injector_thd= thd;
6341 injector_ndb= i_ndb;
6342 schema_ndb= s_ndb;
6343
6344 if (opt_bin_log && opt_ndb_log_bin)
6345 {
6346 ndb_binlog_running= TRUE;
6347 }
6348
6349 log_verbose(1, "Setup completed");
6350
6351 /* Thread start up completed */
6352 native_mutex_unlock(&injector_mutex);
6353 native_cond_signal(&injector_cond);
6354
6355 log_verbose(1, "Wait for server start completed");
6356 /*
6357 wait for mysql server to start (so that the binlog is started
6358 and thus can receive the first GAP event)
6359 */
6360 mysql_mutex_lock(&LOCK_server_started);
6361 while (!mysqld_server_started)
6362 {
6363 struct timespec abstime;
6364 set_timespec(&abstime, 1);
6365 mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
6366 &abstime);
6367 if (is_stop_requested())
6368 {
6369 mysql_mutex_unlock(&LOCK_server_started);
6370 goto err;
6371 }
6372 }
6373 mysql_mutex_unlock(&LOCK_server_started);
6374
6375 // Defer call of THD::init_for_query until after mysqld_server_started
6376 // to ensure that the parts of MySQL Server it uses has been created
6377 thd->init_for_queries();
6378
6379 log_verbose(1, "Check for incidents");
6380
6381 while (do_incident && ndb_binlog_running)
6382 {
6383 /*
6384 check if it is the first log, if so we do not insert a GAP event
6385 as there is really no log to have a GAP in
6386 */
6387 if (incident_id == 0)
6388 {
6389 LOG_INFO log_info;
6390 mysql_bin_log.get_current_log(&log_info);
6391 int len= (uint)strlen(log_info.log_file_name);
6392 uint no= 0;
6393 if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
6394 no == 1)
6395 {
6396 /* this is the fist log, so skip GAP event */
6397 break;
6398 }
6399 }
6400
6401 /*
6402 Always insert a GAP event as we cannot know what has happened
6403 in the cluster while not being connected.
6404 */
6405 LEX_STRING const msg[2]=
6406 {
6407 { C_STRING_WITH_LEN("mysqld startup") },
6408 { C_STRING_WITH_LEN("cluster disconnect")}
6409 };
6410 int ret = inj->record_incident(thd,
6411 binary_log::Incident_event::INCIDENT_LOST_EVENTS,
6412 msg[incident_id]);
6413 assert(ret == 0); NDB_IGNORE_VALUE(ret);
6414 do_incident = false; // Don't report incident again, unless we get started
6415 break;
6416 }
6417 incident_id= 1;
6418 {
6419 log_verbose(1, "Wait for cluster to start");
6420 thd->proc_info= "Waiting for ndbcluster to start";
6421
6422 native_mutex_lock(&injector_mutex);
6423 while (!ndb_schema_share ||
6424 (ndb_binlog_running && !ndb_apply_status_share) ||
6425 !ndb_binlog_tables_inited)
6426 {
6427 if (!thd_ndb->valid_ndb())
6428 {
6429 /*
6430 Cluster has gone away before setup was completed.
6431 Keep lock on injector_mutex to prevent further
6432 usage of the injector_ndb, and restart binlog
6433 thread to get rid of any garbage on the ndb objects
6434 */
6435 have_injector_mutex_lock= 1;
6436 binlog_thread_state= BCCC_restart;
6437 goto err;
6438 }
6439 /* ndb not connected yet */
6440 struct timespec abstime;
6441 set_timespec(&abstime, 1);
6442 native_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
6443 if (is_stop_requested())
6444 {
6445 native_mutex_unlock(&injector_mutex);
6446 goto err;
6447 }
6448 if (thd->killed == THD::KILL_CONNECTION)
6449 {
6450 /*
6451 Since the ndb binlog thread adds itself to the "global thread list"
6452 it need to look at the "killed" flag and stop the thread to avoid
6453 that the server hangs during shutdown while waiting for the "global
6454 thread list" to be emtpy.
6455 */
6456 sql_print_information("NDB Binlog: Server shutdown detected while "
6457 "waiting for ndbcluster to start...");
6458 native_mutex_unlock(&injector_mutex);
6459 goto err;
6460 }
6461 }
6462 native_mutex_unlock(&injector_mutex);
6463
6464 assert(ndbcluster_hton->slot != ~(uint)0);
6465 thd_set_thd_ndb(thd, thd_ndb);
6466 thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
6467 thd->query_id= 0; // to keep valgrind quiet
6468 }
6469
6470 schema_dist_data.init(g_ndb_cluster_connection);
6471
6472 {
6473 log_verbose(1, "Wait for first event");
6474 // wait for the first event
6475 thd->proc_info= "Waiting for first event from ndbcluster";
6476 int schema_res, res;
6477 Uint64 schema_gci;
6478 do
6479 {
6480 DBUG_PRINT("info", ("Waiting for the first event"));
6481
6482 if (is_stop_requested())
6483 goto err;
6484
6485 schema_res= s_ndb->pollEvents(100, &schema_gci);
6486 } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
6487 if (ndb_binlog_running)
6488 {
6489 Uint64 gci= i_ndb->getLatestGCI();
6490 while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
6491 {
6492 if (is_stop_requested())
6493 goto err;
6494 res= i_ndb->pollEvents(10, &gci);
6495 }
6496 if (gci > schema_gci)
6497 {
6498 schema_gci= gci;
6499 }
6500 }
6501 // now check that we have epochs consistant with what we had before the restart
6502 DBUG_PRINT("info", ("schema_res: %d schema_gci: %u/%u", schema_res,
6503 (uint)(schema_gci >> 32),
6504 (uint)(schema_gci)));
6505 {
6506 i_ndb->flushIncompleteEvents(schema_gci);
6507 s_ndb->flushIncompleteEvents(schema_gci);
6508 if (schema_gci < ndb_latest_handled_binlog_epoch)
6509 {
6510 sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
6511 "ndb_latest_handled_binlog_epoch: %u/%u, while current epoch: %u/%u. "
6512 "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
6513 (uint)(ndb_latest_handled_binlog_epoch >> 32),
6514 (uint)(ndb_latest_handled_binlog_epoch),
6515 (uint)(schema_gci >> 32),
6516 (uint)(schema_gci));
6517 ndb_set_latest_trans_gci(0);
6518 ndb_latest_handled_binlog_epoch= 0;
6519 ndb_latest_applied_binlog_epoch= 0;
6520 ndb_latest_received_binlog_epoch= 0;
6521 ndb_index_stat_restart();
6522 }
6523 else if (ndb_latest_applied_binlog_epoch > 0)
6524 {
6525 sql_print_warning("NDB Binlog: cluster has reconnected. "
6526 "Changes to the database that occured while "
6527 "disconnected will not be in the binlog");
6528 }
6529 if (opt_ndb_extra_logging)
6530 {
6531 sql_print_information("NDB Binlog: starting log at epoch %u/%u",
6532 (uint)(schema_gci >> 32),
6533 (uint)(schema_gci));
6534 }
6535 }
6536 log_verbose(1, "Got first event");
6537 }
6538 /*
6539 binlog thread is ready to receive events
6540 - client threads may now start updating data, i.e. tables are
6541 no longer read only
6542 */
6543 ndb_binlog_is_ready= TRUE;
6544
6545 if (opt_ndb_extra_logging)
6546 sql_print_information("NDB Binlog: ndb tables writable");
6547 ndb_tdc_close_cached_tables();
6548
6549 /*
6550 Signal any waiting thread that ndb table setup is
6551 now complete
6552 */
6553 ndb_notify_tables_writable();
6554
6555 {
6556 static LEX_CSTRING db_lex_cstr= EMPTY_CSTR;
6557 thd->reset_db(db_lex_cstr);
6558 }
6559
6560 log_verbose(1, "Startup and setup completed");
6561
6562 /*
6563 Main NDB Injector loop
6564 */
6565 do_incident = true; // If we get disconnected again...do incident report
6566 binlog_thread_state= BCCC_running;
6567 for ( ; !((is_stop_requested() ||
6568 binlog_thread_state) &&
6569 ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
6570 binlog_thread_state != BCCC_restart; )
6571 {
6572 #ifndef NDEBUG
6573 if (binlog_thread_state)
6574 {
6575 DBUG_PRINT("info", ("binlog_thread_state: %d, "
6576 "ndb_latest_handled_binlog_epoch: %u/%u, "
6577 "*get_latest_trans_gci(): %u/%u",
6578 binlog_thread_state,
6579 (uint)(ndb_latest_handled_binlog_epoch >> 32),
6580 (uint)(ndb_latest_handled_binlog_epoch),
6581 (uint)(ndb_get_latest_trans_gci() >> 32),
6582 (uint)(ndb_get_latest_trans_gci())));
6583 }
6584 #endif
6585
6586 /*
6587 now we don't want any events before next gci is complete
6588 */
6589 thd->proc_info= "Waiting for event from ndbcluster";
6590 thd->set_time();
6591
6592 /* wait for event or 1000 ms */
6593 Uint64 gci= 0, schema_gci;
6594 int res= 0, tot_poll_wait= 1000;
6595
6596 if (ndb_binlog_running)
6597 {
6598 // Capture any dynamic changes to max_alloc
6599 i_ndb->set_eventbuf_max_alloc(opt_ndb_eventbuffer_max_alloc);
6600
6601 res= i_ndb->pollEvents(tot_poll_wait, &gci);
6602 tot_poll_wait= 0;
6603 }
6604 int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
6605 ndb_latest_received_binlog_epoch= gci;
6606
6607 while (gci > schema_gci && schema_res >= 0)
6608 {
6609 static char buf[64];
6610 thd->proc_info= "Waiting for schema epoch";
6611 my_snprintf(buf, sizeof(buf), "%s %u/%u(%u/%u)", thd->proc_info,
6612 (uint)(schema_gci >> 32),
6613 (uint)(schema_gci),
6614 (uint)(gci >> 32),
6615 (uint)(gci));
6616 thd->proc_info= buf;
6617 schema_res= s_ndb->pollEvents(10, &schema_gci);
6618 }
6619
6620 if ((is_stop_requested() ||
6621 binlog_thread_state) &&
6622 (ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
6623 !ndb_binlog_running))
6624 break; /* Stopping thread */
6625
6626 if (thd->killed == THD::KILL_CONNECTION)
6627 {
6628 /*
6629 Since the ndb binlog thread adds itself to the "global thread list"
6630 it need to look at the "killed" flag and stop the thread to avoid
6631 that the server hangs during shutdown while waiting for the "global
6632 thread list" to be emtpy.
6633 In pre 5.6 versions the thread was also added to "global thread
6634 list" but the "global thread *count*" variable was not incremented
6635 and thus the same problem didn't exist.
6636 The only reason for adding the ndb binlog thread to "global thread
6637 list" is to be able to see the thread state using SHOW PROCESSLIST
6638 and I_S.PROCESSLIST
6639 */
6640 sql_print_information("NDB Binlog: Server shutdown detected...");
6641 break;
6642 }
6643
6644 MEM_ROOT **root_ptr= my_thread_get_THR_MALLOC();
6645 MEM_ROOT *old_root= *root_ptr;
6646 MEM_ROOT mem_root;
6647 init_sql_alloc(PSI_INSTRUMENT_ME, &mem_root, 4096, 0);
6648
6649 // The Ndb_schema_event_handler does not necessarily need
6650 // to use the same memroot(or vice versa)
6651 Ndb_schema_event_handler
6652 schema_event_handler(thd, &mem_root,
6653 g_ndb_cluster_connection->node_id(),
6654 schema_dist_data);
6655
6656 *root_ptr= &mem_root;
6657
6658 if (unlikely(schema_res > 0))
6659 {
6660 thd->proc_info= "Processing events from schema table";
6661 g_ndb_log_slave_updates= opt_log_slave_updates;
6662 s_ndb->
6663 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6664 s_ndb->
6665 setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6666 NdbEventOperation *pOp= s_ndb->nextEvent();
6667 while (pOp != NULL)
6668 {
6669 if (!pOp->hasError())
6670 {
6671 schema_event_handler.handle_event(s_ndb, pOp);
6672
6673 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
6674 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6675 "<empty>"));
6676 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
6677 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6678 "<empty>"));
6679 if (i_ndb->getEventOperation() == NULL &&
6680 s_ndb->getEventOperation() == NULL &&
6681 binlog_thread_state == BCCC_running)
6682 {
6683 DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
6684 binlog_thread_state= BCCC_restart;
6685 if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
6686 {
6687 sql_print_error("NDB Binlog: latest transaction in epoch %u/%u not in binlog "
6688 "as latest received epoch is %u/%u",
6689 (uint)(ndb_get_latest_trans_gci() >> 32),
6690 (uint)(ndb_get_latest_trans_gci()),
6691 (uint)(ndb_latest_received_binlog_epoch >> 32),
6692 (uint)(ndb_latest_received_binlog_epoch));
6693 }
6694 }
6695 }
6696 else
6697 sql_print_error("NDB: error %lu (%s) on handling "
6698 "binlog schema event",
6699 (ulong) pOp->getNdbError().code,
6700 pOp->getNdbError().message);
6701 pOp= s_ndb->nextEvent();
6702 }
6703 updateInjectorStats(s_ndb, i_ndb);
6704 }
6705
6706 if (!ndb_binlog_running)
6707 {
6708 /*
6709 Just consume any events, not used if no binlogging
6710 e.g. node failure events
6711 */
6712 Uint64 tmp_gci;
6713 if (i_ndb->pollEvents(0, &tmp_gci))
6714 {
6715 NdbEventOperation *pOp;
6716 while ((pOp= i_ndb->nextEvent()))
6717 {
6718 if ((unsigned) pOp->getEventType() >=
6719 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
6720 {
6721 ndb_binlog_index_row row;
6722 handle_non_data_event(thd, pOp, row);
6723 }
6724 }
6725 if (i_ndb->getEventOperation() == NULL &&
6726 s_ndb->getEventOperation() == NULL &&
6727 binlog_thread_state == BCCC_running)
6728 {
6729 DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
6730 binlog_thread_state= BCCC_restart;
6731 }
6732 }
6733 updateInjectorStats(s_ndb, i_ndb);
6734 }
6735 else if (res > 0 ||
6736 (ndb_log_empty_epochs() &&
6737 gci > ndb_latest_handled_binlog_epoch))
6738 {
6739 DBUG_PRINT("info", ("pollEvents res: %d", res));
6740 thd->proc_info= "Processing events";
6741 NdbEventOperation *pOp= i_ndb->nextEvent();
6742 ndb_binlog_index_row _row;
6743 ndb_binlog_index_row *rows= &_row;
6744 injector::transaction trans;
6745 unsigned trans_row_count= 0;
6746 unsigned trans_slave_row_count= 0;
6747 if (!pOp)
6748 {
6749 /*
6750 Must be an empty epoch since the condition
6751 (ndb_log_empty_epochs() &&
6752 gci > ndb_latest_handled_binlog_epoch)
6753 must be true we write empty epoch into
6754 ndb_binlog_index
6755 */
6756 DBUG_PRINT("info", ("Writing empty epoch for gci %llu", gci));
6757 DBUG_PRINT("info", ("Initializing transaction"));
6758 inj->new_trans(thd, &trans);
6759 rows= &_row;
6760 memset(&_row, 0, sizeof(_row));
6761 thd->variables.character_set_client= &my_charset_latin1;
6762 goto commit_to_binlog;
6763 }
6764 while (pOp != NULL)
6765 {
6766 rows= &_row;
6767 gci= pOp->getGCI();
6768 DBUG_PRINT("info", ("Handling gci: %u/%u",
6769 (uint)(gci >> 32),
6770 (uint)(gci)));
6771 // sometimes get TE_ALTER with invalid table
6772 assert(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
6773 ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
6774 assert(gci <= ndb_latest_received_binlog_epoch);
6775
6776 /* Update our thread-local debug settings based on the global */
6777 #ifndef NDEBUG
6778 /* Get value of global...*/
6779 {
6780 char buf[256];
6781 DBUG_EXPLAIN_INITIAL(buf, sizeof(buf));
6782 // fprintf(stderr, "Ndb Binlog Injector, setting debug to %s\n",
6783 // buf);
6784 DBUG_SET(buf);
6785 }
6786 #endif
6787
6788 /* initialize some variables for this epoch */
6789
6790 i_ndb->set_eventbuf_max_alloc(opt_ndb_eventbuffer_max_alloc);
6791 g_ndb_log_slave_updates= opt_log_slave_updates;
6792 i_ndb->
6793 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6794 i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6795
6796 memset(&_row, 0, sizeof(_row));
6797 thd->variables.character_set_client= &my_charset_latin1;
6798 DBUG_PRINT("info", ("Initializing transaction"));
6799 inj->new_trans(thd, &trans);
6800 trans_row_count= 0;
6801 trans_slave_row_count= 0;
6802 // pass table map before epoch
6803 {
6804 Uint32 iter= 0;
6805 const NdbEventOperation *gci_op;
6806 Uint32 event_types;
6807
6808 if (!i_ndb->isConsistentGCI(gci))
6809 {
6810 char errmsg[64];
6811 uint end= sprintf(&errmsg[0],
6812 "Detected missing data in GCI %llu, "
6813 "inserting GAP event", gci);
6814 errmsg[end]= '\0';
6815 DBUG_PRINT("info",
6816 ("Detected missing data in GCI %llu, "
6817 "inserting GAP event", gci));
6818 LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
6819 inj->record_incident(thd,
6820 binary_log::Incident_event::INCIDENT_LOST_EVENTS,
6821 msg);
6822 }
6823 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
6824 != NULL)
6825 {
6826 Ndb_event_data *event_data=
6827 (Ndb_event_data *) gci_op->getCustomData();
6828 NDB_SHARE *share= (event_data)?event_data->share:NULL;
6829 DBUG_PRINT("info", ("per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
6830 (long) gci_op, (long) share, event_types));
6831 // workaround for interface returning TE_STOP events
6832 // which are normally filtered out below in the nextEvent loop
6833 if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
6834 {
6835 DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
6836 gci_op->getEvent()->getTable()->getName()));
6837 continue;
6838 }
6839 // this should not happen
6840 if (share == NULL || event_data->shadow_table == NULL)
6841 {
6842 DBUG_PRINT("info", ("no share or table %s!",
6843 gci_op->getEvent()->getTable()->getName()));
6844 continue;
6845 }
6846 if (share == ndb_apply_status_share)
6847 {
6848 // skip this table, it is handled specially
6849 continue;
6850 }
6851 TABLE *table= event_data->shadow_table;
6852 #ifndef NDEBUG
6853 const LEX_STRING &name= table->s->table_name;
6854 #endif
6855 if ((event_types & (NdbDictionary::Event::TE_INSERT |
6856 NdbDictionary::Event::TE_UPDATE |
6857 NdbDictionary::Event::TE_DELETE)) == 0)
6858 {
6859 DBUG_PRINT("info", ("skipping non data event table: %.*s",
6860 (int) name.length, name.str));
6861 continue;
6862 }
6863 if (!trans.good())
6864 {
6865 DBUG_PRINT("info",
6866 ("Found new data event, initializing transaction"));
6867 inj->new_trans(thd, &trans);
6868 }
6869 DBUG_PRINT("info", ("use_table: %.*s, cols %u",
6870 (int) name.length, name.str,
6871 table->s->fields));
6872 injector::transaction::table tbl(table, true);
6873 int ret = trans.use_table(::server_id, tbl);
6874 assert(ret == 0); NDB_IGNORE_VALUE(ret);
6875 }
6876 }
6877 if (trans.good())
6878 {
6879 /* Inject ndb_apply_status WRITE_ROW event */
6880 if (!injectApplyStatusWriteRow(trans,
6881 gci))
6882 {
6883 sql_print_error("NDB Binlog: Failed to inject apply status write row");
6884 }
6885 }
6886
6887 do
6888 {
6889 if (pOp->hasError() &&
6890 handle_error(pOp) < 0)
6891 goto err;
6892
6893 #ifndef NDEBUG
6894 {
6895 Ndb_event_data *event_data=
6896 (Ndb_event_data *) pOp->getCustomData();
6897 NDB_SHARE *share= (event_data)?event_data->share:NULL;
6898 DBUG_PRINT("info",
6899 ("EVENT TYPE: %d GCI: %u/%u last applied: %u/%u "
6900 "share: 0x%lx (%s.%s)", pOp->getEventType(),
6901 (uint)(gci >> 32),
6902 (uint)(gci),
6903 (uint)(ndb_latest_applied_binlog_epoch >> 32),
6904 (uint)(ndb_latest_applied_binlog_epoch),
6905 (long) share,
6906 share ? share->db : "'NULL'",
6907 share ? share->table_name : "'NULL'"));
6908 assert(share != 0);
6909 }
6910 // assert that there is consistancy between gci op list
6911 // and event list
6912 {
6913 Uint32 iter= 0;
6914 const NdbEventOperation *gci_op;
6915 Uint32 event_types;
6916 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
6917 != NULL)
6918 {
6919 if (gci_op == pOp)
6920 break;
6921 }
6922 assert(gci_op == pOp);
6923 assert((event_types & pOp->getEventType()) != 0);
6924 }
6925 #endif
6926
6927 if ((unsigned) pOp->getEventType() <
6928 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
6929 handle_data_event(thd, i_ndb, pOp, &rows, trans,
6930 trans_row_count, trans_slave_row_count);
6931 else
6932 {
6933 handle_non_data_event(thd, pOp, *rows);
6934 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
6935 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6936 "<empty>"));
6937 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
6938 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6939 "<empty>"));
6940 if (i_ndb->getEventOperation() == NULL &&
6941 s_ndb->getEventOperation() == NULL &&
6942 binlog_thread_state == BCCC_running)
6943 {
6944 DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
6945 binlog_thread_state= BCCC_restart;
6946 if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
6947 {
6948 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
6949 "as latest received epoch is %lu",
6950 (ulong) ndb_get_latest_trans_gci(),
6951 (ulong) ndb_latest_received_binlog_epoch);
6952 }
6953 }
6954 }
6955
6956 // Capture any dynamic changes to max_alloc
6957 i_ndb->set_eventbuf_max_alloc(opt_ndb_eventbuffer_max_alloc);
6958
6959 pOp= i_ndb->nextEvent();
6960 } while (pOp && pOp->getGCI() == gci);
6961
6962 updateInjectorStats(s_ndb, i_ndb);
6963
6964 /*
6965 note! pOp is not referring to an event in the next epoch
6966 or is == 0
6967 */
6968
6969 while (trans.good())
6970 {
6971 commit_to_binlog:
6972 if (!ndb_log_empty_epochs())
6973 {
6974 /*
6975 If
6976 - We did not add any 'real' rows to the Binlog AND
6977 - We did not apply any slave row updates, only
6978 ndb_apply_status updates
6979 THEN
6980 Don't write the Binlog transaction which just
6981 contains ndb_apply_status updates.
6982 (For cicular rep with log_apply_status, ndb_apply_status
6983 updates will propagate while some related, real update
6984 is propagating)
6985 */
6986 if ((trans_row_count == 0) &&
6987 (! (opt_ndb_log_apply_status &&
6988 trans_slave_row_count) ))
6989 {
6990 /* nothing to commit, rollback instead */
6991 if (int r= trans.rollback())
6992 {
6993 sql_print_error("NDB Binlog: "
6994 "Error during ROLLBACK of GCI %u/%u. Error: %d",
6995 uint(gci >> 32), uint(gci), r);
6996 /* TODO: Further handling? */
6997 }
6998 break;
6999 }
7000 }
7001 thd->proc_info= "Committing events to binlog";
7002 if (int r= trans.commit())
7003 {
7004 sql_print_error("NDB Binlog: "
7005 "Error during COMMIT of GCI. Error: %d",
7006 r);
7007 /* TODO: Further handling? */
7008 }
7009 injector::transaction::binlog_pos start= trans.start_pos();
7010 injector::transaction::binlog_pos next = trans.next_pos();
7011 rows->gci= (Uint32)(gci >> 32); // Expose gci hi/lo
7012 rows->epoch= gci;
7013 rows->start_master_log_file= start.file_name();
7014 rows->start_master_log_pos= start.file_pos();
7015 if ((next.file_pos() == 0) &&
7016 ndb_log_empty_epochs())
7017 {
7018 /* Empty transaction 'committed' due to log_empty_epochs
7019 * therefore no next position
7020 */
7021 rows->next_master_log_file= start.file_name();
7022 rows->next_master_log_pos= start.file_pos();
7023 }
7024 else
7025 {
7026 rows->next_master_log_file= next.file_name();
7027 rows->next_master_log_pos= next.file_pos();
7028 }
7029
7030 DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
7031 if (opt_ndb_log_binlog_index)
7032 {
7033 if (ndb_binlog_index_table__write_rows(thd, rows))
7034 {
7035 /*
7036 Writing to ndb_binlog_index failed, check if we are
7037 being killed and retry
7038 */
7039 if (thd->killed)
7040 {
7041 DBUG_PRINT("error", ("Failed to write to ndb_binlog_index at shutdown, retrying"));
7042 mysql_mutex_lock(&thd->LOCK_thd_data);
7043 const THD::killed_state save_killed= thd->killed;
7044 /* We are cleaning up, allow for flushing last epoch */
7045 thd->killed= THD::NOT_KILLED;
7046 /* also clear error from last failing write */
7047 thd->clear_error();
7048 ndb_binlog_index_table__write_rows(thd, rows);
7049 /* Restore kill flag */
7050 thd->killed= save_killed;
7051 mysql_mutex_unlock(&thd->LOCK_thd_data);
7052 }
7053 }
7054 }
7055 ndb_latest_applied_binlog_epoch= gci;
7056 break;
7057 }
7058 ndb_latest_handled_binlog_epoch= gci;
7059 }
7060
7061 if(!i_ndb->isConsistent(gci))
7062 {
7063 char errmsg[64];
7064 uint end= sprintf(&errmsg[0],
7065 "Detected missing data in GCI %llu, "
7066 "inserting GAP event", gci);
7067 errmsg[end]= '\0';
7068 DBUG_PRINT("info",
7069 ("Detected missing data in GCI %llu, "
7070 "inserting GAP event", gci));
7071 LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
7072 inj->record_incident(thd,
7073 binary_log::Incident_event::INCIDENT_LOST_EVENTS,
7074 msg);
7075 }
7076 }
7077
7078 // Notify the schema event handler about post_epoch so it may finish
7079 // any outstanding business
7080 schema_event_handler.post_epoch();
7081
7082 free_root(&mem_root, MYF(0));
7083 *root_ptr= old_root;
7084 ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
7085 }
7086 err:
7087 if (binlog_thread_state != BCCC_restart)
7088 {
7089 log_info("Shutting down");
7090 thd->proc_info= "Shutting down";
7091 }
7092 else
7093 {
7094 log_info("Restarting");
7095 thd->proc_info= "Restarting";
7096 }
7097 if (!have_injector_mutex_lock)
7098 native_mutex_lock(&injector_mutex);
7099 /* don't mess with the injector_ndb anymore from other threads */
7100 injector_thd= 0;
7101 injector_ndb= 0;
7102 schema_ndb= 0;
7103 native_mutex_unlock(&injector_mutex);
7104 thd->reset_db(NULL_CSTR); // as not to try to free memory
7105
7106 /*
7107 This will cause the util thread to start to try to initialize again
7108 via ndbcluster_setup_binlog_table_shares. But since injector_ndb is
7109 set to NULL it will not succeed until injector_ndb is reinitialized.
7110 */
7111 ndb_binlog_tables_inited= FALSE;
7112
7113 if (ndb_apply_status_share)
7114 {
7115 /* ndb_share reference binlog extra free */
7116 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
7117 ndb_apply_status_share->key_string(),
7118 ndb_apply_status_share->use_count));
7119 free_share(&ndb_apply_status_share);
7120 ndb_apply_status_share= 0;
7121 }
7122 if (ndb_schema_share)
7123 {
7124 /* begin protect ndb_schema_share */
7125 native_mutex_lock(&ndb_schema_share_mutex);
7126 /* ndb_share reference binlog extra free */
7127 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
7128 ndb_schema_share->key_string(),
7129 ndb_schema_share->use_count));
7130 free_share(&ndb_schema_share);
7131 ndb_schema_share= 0;
7132 native_mutex_unlock(&ndb_schema_share_mutex);
7133 /* end protect ndb_schema_share */
7134 }
7135
7136 /* remove all event operations */
7137 if (s_ndb)
7138 {
7139 remove_event_operations(s_ndb);
7140 delete s_ndb;
7141 s_ndb= 0;
7142 }
7143 if (i_ndb)
7144 {
7145 remove_event_operations(i_ndb);
7146 delete i_ndb;
7147 i_ndb= 0;
7148 }
7149
7150 if (thd_ndb)
7151 {
7152 Thd_ndb::release(thd_ndb);
7153 thd_set_thd_ndb(thd, NULL);
7154 thd_ndb= NULL;
7155 }
7156
7157 /**
7158 * release all extra references from tables
7159 */
7160 {
7161 if (opt_ndb_extra_logging > 9)
7162 sql_print_information("NDB Binlog: Release extra share references");
7163
7164 native_mutex_lock(&ndbcluster_mutex);
7165 for (uint i= 0; i < ndbcluster_open_tables.records;)
7166 {
7167 NDB_SHARE * share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,
7168 i);
7169 if (share->state != NSS_DROPPED)
7170 {
7171 /*
7172 The share kept by the server has not been freed, free it
7173 */
7174 ndbcluster_mark_share_dropped(share);
7175 /* ndb_share reference create free */
7176 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
7177 share->key_string(), share->use_count));
7178 free_share(&share, TRUE);
7179
7180 /**
7181 * This might have altered hash table...not sure if it's stable..
7182 * so we'll restart instead
7183 */
7184 i = 0;
7185 }
7186 else
7187 {
7188 i++;
7189 }
7190 }
7191 native_mutex_unlock(&ndbcluster_mutex);
7192 }
7193 log_info("Stopping...");
7194
7195 ndb_tdc_close_cached_tables();
7196 if (opt_ndb_extra_logging > 15)
7197 {
7198 sql_print_information("NDB Binlog: remaining open tables: ");
7199 for (uint i= 0; i < ndbcluster_open_tables.records; i++)
7200 {
7201 NDB_SHARE* share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,i);
7202 sql_print_information(" %s.%s state: %u use_count: %u",
7203 share->db,
7204 share->table_name,
7205 (uint)share->state,
7206 share->use_count);
7207 }
7208 }
7209
7210 schema_dist_data.release();
7211
7212 if (binlog_thread_state == BCCC_restart)
7213 {
7214 native_mutex_lock(&injector_mutex);
7215 goto restart_cluster_failure;
7216 }
7217
7218 // Release the thd->net created without vio
7219 thd->get_protocol_classic()->end_net();
7220 thd->release_resources();
7221 thd_manager->remove_thd(thd);
7222 delete thd;
7223
7224 ndb_binlog_running= FALSE;
7225 (void) native_cond_signal(&injector_cond);
7226
7227 log_info("Stopped");
7228
7229 DBUG_PRINT("exit", ("ndb_binlog_thread"));
7230 DBUG_VOID_RETURN;
7231 }
7232
7233
7234 /*
7235 Return string containing current status of ndb binlog as
7236 comma separated name value pairs.
7237
7238 Used by ndbcluster_show_status() to fill the "binlog" row
7239 in result of SHOW ENGINE NDB STATUS
7240
7241 @param buf The buffer where to print status string
7242 @param bufzies Size of the buffer
7243
7244 @return Length of the string printed to "buf" or 0 if no string
7245 is printed
7246 */
7247
7248 size_t
ndbcluster_show_status_binlog(char * buf,size_t buf_size)7249 ndbcluster_show_status_binlog(char *buf, size_t buf_size)
7250 {
7251 DBUG_ENTER("ndbcluster_show_status_binlog");
7252
7253 native_mutex_lock(&injector_mutex);
7254 if (injector_ndb)
7255 {
7256 const ulonglong latest_epoch= injector_ndb->getLatestGCI();
7257 native_mutex_unlock(&injector_mutex);
7258
7259 // Get highest trans gci seen by the cluster connections
7260 const ulonglong latest_trans_epoch = ndb_get_latest_trans_gci();
7261
7262 const size_t buf_len =
7263 my_snprintf(buf, buf_size,
7264 "latest_epoch=%llu, "
7265 "latest_trans_epoch=%llu, "
7266 "latest_received_binlog_epoch=%llu, "
7267 "latest_handled_binlog_epoch=%llu, "
7268 "latest_applied_binlog_epoch=%llu",
7269 latest_epoch,
7270 latest_trans_epoch,
7271 ndb_latest_received_binlog_epoch,
7272 ndb_latest_handled_binlog_epoch,
7273 ndb_latest_applied_binlog_epoch);
7274 DBUG_RETURN(buf_len);
7275 }
7276 else
7277 native_mutex_unlock(&injector_mutex);
7278 DBUG_RETURN(0);
7279 }
7280
7281
7282 #ifdef NDB_WITHOUT_SERVER_ID_BITS
7283
7284 /* No --server-id-bits=<bits> -> implement constant opt_server_id_mask */
7285 ulong opt_server_id_mask = ~0;
7286
7287 #endif
7288