1 /*
2 Copyright (c) 2006, 2016, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "ha_ndbcluster_glue.h"
26
27 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
28 #include "ha_ndbcluster.h"
29 #include "ha_ndbcluster_connection.h"
30 #include "ndb_local_connection.h"
31 #include "ndb_thd.h"
32 #include "ndb_table_guard.h"
33 #include "ndb_global_schema_lock.h"
34 #include "ndb_global_schema_lock_guard.h"
35
36 #include "rpl_injector.h"
37 #include "rpl_filter.h"
38 #if MYSQL_VERSION_ID > 50600
39 #include "rpl_slave.h"
40 #else
41 #include "slave.h"
42 #include "log_event.h"
43 #endif
44 #include "global_threads.h"
45 #include "ha_ndbcluster_binlog.h"
46 #include <ndbapi/NdbDictionary.hpp>
47 #include <ndbapi/ndb_cluster_connection.hpp>
48
49 extern my_bool opt_ndb_log_orig;
50 extern my_bool opt_ndb_log_bin;
51 extern my_bool opt_ndb_log_update_as_write;
52 extern my_bool opt_ndb_log_updated_only;
53 extern my_bool opt_ndb_log_binlog_index;
54 extern my_bool opt_ndb_log_apply_status;
55 extern ulong opt_ndb_extra_logging;
56 extern st_ndb_slave_state g_ndb_slave_state;
57
58 bool ndb_log_empty_epochs(void);
59
60 /*
61 defines for cluster replication table names
62 */
63 #include "ha_ndbcluster_tables.h"
64
65 #include "ndb_dist_priv_util.h"
66
67 /*
68 Timeout for syncing schema events between
69 mysql servers, and between mysql server and the binlog
70 */
71 static const int DEFAULT_SYNC_TIMEOUT= 120;
72
73 /*
74 Flag showing if the ndb injector thread is running, if so == 1
75 -1 if it was started but later stopped for some reason
76 0 if never started
77 */
78 static int ndb_binlog_thread_running= 0;
79 /*
80 Flag showing if the ndb binlog should be created, if so == TRUE
81 FALSE if not
82 */
83 my_bool ndb_binlog_running= FALSE;
84 static my_bool ndb_binlog_tables_inited= FALSE;
85 static my_bool ndb_binlog_is_ready= FALSE;
86
87 bool
ndb_binlog_is_read_only(void)88 ndb_binlog_is_read_only(void)
89 {
90 if(!ndb_binlog_tables_inited)
91 {
92 /* the ndb_* system tables not setup yet */
93 return true;
94 }
95
96 if (ndb_binlog_running && !ndb_binlog_is_ready)
97 {
98 /*
99 The binlog thread is supposed to write to binlog
100 but not ready (still initializing or has lost connection)
101 */
102 return true;
103 }
104 return false;
105 }
106
107 /*
108 Global reference to the ndb injector thread THD oject
109
110 Has one sole purpose, for setting the in_use table member variable
111 in get_share(...)
112 */
113 extern THD * injector_thd; // Declared in ha_ndbcluster.cc
114
115 /*
116 Global reference to ndb injector thd object.
117
118 Used mainly by the binlog index thread, but exposed to the client sql
119 thread for one reason; to setup the events operations for a table
120 to enable ndb injector thread receiving events.
121
122 Must therefore always be used with a surrounding
123 pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
124 */
125 static Ndb *injector_ndb= 0;
126 static Ndb *schema_ndb= 0;
127
128 static int ndbcluster_binlog_inited= 0;
129 /*
130 Flag "ndbcluster_binlog_terminating" set when shutting down mysqld.
131 Server main loop should call handlerton function:
132
133 ndbcluster_hton->binlog_func ==
134 ndbcluster_binlog_func(...,BFN_BINLOG_END,...) ==
135 ndbcluster_binlog_end
136
137 at shutdown, which sets the flag. And then server needs to wait for it
138 to complete. Otherwise binlog will not be complete.
139
140 ndbcluster_hton->panic == ndbcluster_end() will not return until
141 ndb binlog is completed
142 */
143 static int ndbcluster_binlog_terminating= 0;
144
145 /*
146 Mutex and condition used for interacting between client sql thread
147 and injector thread
148 */
149 pthread_t ndb_binlog_thread;
150 pthread_mutex_t injector_mutex;
151 pthread_cond_t injector_cond;
152
153 /* NDB Injector thread (used for binlog creation) */
154 static ulonglong ndb_latest_applied_binlog_epoch= 0;
155 static ulonglong ndb_latest_handled_binlog_epoch= 0;
156 static ulonglong ndb_latest_received_binlog_epoch= 0;
157
158 NDB_SHARE *ndb_apply_status_share= 0;
159 NDB_SHARE *ndb_schema_share= 0;
160 pthread_mutex_t ndb_schema_share_mutex;
161
162 extern my_bool opt_log_slave_updates;
163 static my_bool g_ndb_log_slave_updates;
164
165 /* Schema object distribution handling */
166 HASH ndb_schema_objects;
167 typedef struct st_ndb_schema_object {
168 pthread_mutex_t mutex;
169 char *key;
170 uint key_length;
171 uint use_count;
172 MY_BITMAP slock_bitmap;
173 uint32 slock[256/32]; // 256 bits for lock status of table
174 uint32 table_id;
175 uint32 table_version;
176 } NDB_SCHEMA_OBJECT;
177 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
178 my_bool create_if_not_exists,
179 my_bool have_lock);
180 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
181 bool have_lock);
182
183 #ifndef DBUG_OFF
184 /* purecov: begin deadcode */
print_records(TABLE * table,const uchar * record)185 static void print_records(TABLE *table, const uchar *record)
186 {
187 for (uint j= 0; j < table->s->fields; j++)
188 {
189 char buf[40];
190 int pos= 0;
191 Field *field= table->field[j];
192 const uchar* field_ptr= field->ptr - table->record[0] + record;
193 int pack_len= field->pack_length();
194 int n= pack_len < 10 ? pack_len : 10;
195
196 for (int i= 0; i < n && pos < 20; i++)
197 {
198 pos+= sprintf(&buf[pos]," %x", (int) (uchar) field_ptr[i]);
199 }
200 buf[pos]= 0;
201 DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
202 }
203 }
204 /* purecov: end */
205 #else
206 #define print_records(a,b)
207 #endif
208
209
210 #ifndef DBUG_OFF
dbug_print_table(const char * info,TABLE * table)211 static void dbug_print_table(const char *info, TABLE *table)
212 {
213 if (table == 0)
214 {
215 DBUG_PRINT("info",("%s: (null)", info));
216 return;
217 }
218 DBUG_PRINT("info",
219 ("%s: %s.%s s->fields: %d "
220 "reclength: %lu rec_buff_length: %u record[0]: 0x%lx "
221 "record[1]: 0x%lx",
222 info,
223 table->s->db.str,
224 table->s->table_name.str,
225 table->s->fields,
226 table->s->reclength,
227 table->s->rec_buff_length,
228 (long) table->record[0],
229 (long) table->record[1]));
230
231 for (unsigned int i= 0; i < table->s->fields; i++)
232 {
233 Field *f= table->field[i];
234 DBUG_PRINT("info",
235 ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d pack_length: %d "
236 "ptr: 0x%lx[+%d] null_bit: %u null_ptr: 0x%lx[+%d]",
237 i,
238 f->field_name,
239 (long) f->flags,
240 (f->flags & PRI_KEY_FLAG) ? "pri" : "attr",
241 (f->flags & NOT_NULL_FLAG) ? "" : ",nullable",
242 (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
243 (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
244 (f->flags & BLOB_FLAG) ? ",blob" : "",
245 (f->flags & BINARY_FLAG) ? ",binary" : "",
246 f->real_type(),
247 f->pack_length(),
248 (long) f->ptr, (int) (f->ptr - table->record[0]),
249 f->null_bit,
250 (long) f->null_ptr,
251 (int) ((uchar*) f->null_ptr - table->record[0])));
252 if (f->type() == MYSQL_TYPE_BIT)
253 {
254 Field_bit *g= (Field_bit*) f;
255 DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d bit_ptr: 0x%lx[+%d] "
256 "bit_ofs: %d bit_len: %u",
257 g->field_length, (long) g->bit_ptr,
258 (int) ((uchar*) g->bit_ptr -
259 table->record[0]),
260 g->bit_ofs, g->bit_len));
261 }
262 }
263 }
264 #else
265 #define dbug_print_table(a,b)
266 #endif
267
268
269 static inline void
print_warning_list(const char * prefix,THD * thd)270 print_warning_list(const char* prefix, THD* thd)
271 {
272 Diagnostics_area::Sql_condition_iterator
273 it(thd->get_stmt_da()->sql_conditions());
274
275 const Sql_condition *err;
276 while ((err= it++))
277 {
278 sql_print_warning("%s: (%d)%s",
279 prefix,
280 err->get_sql_errno(),
281 err->get_message_text());
282 }
283 }
284
285
run_query(THD * thd,char * buf,char * end,const int * no_print_error)286 static void run_query(THD *thd, char *buf, char *end,
287 const int *no_print_error)
288 {
289 /*
290 NOTE! Don't use this function for new implementation, backward
291 compat. only
292 */
293
294 Ndb_local_connection mysqld(thd);
295
296 /*
297 Run the query, suppress some errors from being printed
298 to log and ignore any error returned
299 */
300 (void)mysqld.raw_run_query(buf, (end - buf),
301 no_print_error);
302 }
303
304 static void
ndbcluster_binlog_close_table(THD * thd,NDB_SHARE * share)305 ndbcluster_binlog_close_table(THD *thd, NDB_SHARE *share)
306 {
307 DBUG_ENTER("ndbcluster_binlog_close_table");
308 Ndb_event_data *event_data= share->event_data;
309 if (event_data)
310 {
311 delete event_data;
312 share->event_data= 0;
313 }
314 DBUG_VOID_RETURN;
315 }
316
317
318 /*
319 Open a shadow table for the table given in share.
320 - The shadow table is (mainly) used when an event is
321 recieved from the data nodes which need to be written
322 to the binlog injector.
323 */
324
325 static int
ndb_binlog_open_shadow_table(THD * thd,NDB_SHARE * share)326 ndb_binlog_open_shadow_table(THD *thd, NDB_SHARE *share)
327 {
328 int error;
329 DBUG_ASSERT(share->event_data == 0);
330 Ndb_event_data *event_data= share->event_data= new Ndb_event_data(share);
331 DBUG_ENTER("ndb_binlog_open_shadow_table");
332
333 MEM_ROOT **root_ptr=
334 my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
335 MEM_ROOT *old_root= *root_ptr;
336 init_sql_alloc(&event_data->mem_root, 1024, 0);
337 *root_ptr= &event_data->mem_root;
338
339 TABLE_SHARE *shadow_table_share=
340 (TABLE_SHARE*)alloc_root(&event_data->mem_root, sizeof(TABLE_SHARE));
341 TABLE *shadow_table=
342 (TABLE*)alloc_root(&event_data->mem_root, sizeof(TABLE));
343
344 init_tmp_table_share(thd, shadow_table_share,
345 share->db, 0,
346 share->table_name,
347 share->key);
348 if ((error= open_table_def(thd, shadow_table_share, 0)) ||
349 (error= open_table_from_share(thd, shadow_table_share, "", 0,
350 (uint) (OPEN_FRM_FILE_ONLY | DELAYED_OPEN | READ_ALL),
351 0, shadow_table,
352 #ifdef NDB_WITHOUT_ONLINE_ALTER
353 false
354 #else
355 OTM_OPEN
356 #endif
357 )))
358 {
359 DBUG_PRINT("error", ("failed to open shadow table, error: %d my_errno: %d",
360 error, my_errno));
361 free_table_share(shadow_table_share);
362 delete event_data;
363 share->event_data= 0;
364 *root_ptr= old_root;
365 DBUG_RETURN(error);
366 }
367 event_data->shadow_table= shadow_table;
368
369 mysql_mutex_lock(&LOCK_open);
370 assign_new_table_id(shadow_table_share);
371 mysql_mutex_unlock(&LOCK_open);
372
373 shadow_table->in_use= injector_thd;
374
375 shadow_table->s->db.str= share->db;
376 shadow_table->s->db.length= strlen(share->db);
377 shadow_table->s->table_name.str= share->table_name;
378 shadow_table->s->table_name.length= strlen(share->table_name);
379 /* We can't use 'use_all_columns()' as the file object is not setup yet */
380 shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set,
381 &shadow_table->s->all_set);
382 #ifndef DBUG_OFF
383 dbug_print_table("table", shadow_table);
384 #endif
385 *root_ptr= old_root;
386 DBUG_RETURN(0);
387 }
388
389
390 /*
391 Initialize the binlog part of the NDB_SHARE
392 */
ndbcluster_binlog_init_share(THD * thd,NDB_SHARE * share,TABLE * _table)393 int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table)
394 {
395 MEM_ROOT *mem_root= &share->mem_root;
396 int do_event_op= ndb_binlog_running;
397 int error= 0;
398 DBUG_ENTER("ndbcluster_binlog_init_share");
399
400 share->connect_count= g_ndb_cluster_connection->get_connect_count();
401 #ifdef HAVE_NDB_BINLOG
402 share->m_cfn_share= NULL;
403 #endif
404
405 share->op= 0;
406 share->new_op= 0;
407 share->event_data= 0;
408
409 if (!ndb_schema_share &&
410 strcmp(share->db, NDB_REP_DB) == 0 &&
411 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
412 do_event_op= 1;
413 else if (!ndb_apply_status_share &&
414 strcmp(share->db, NDB_REP_DB) == 0 &&
415 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
416 do_event_op= 1;
417
418 if (Ndb_dist_priv_util::is_distributed_priv_table(share->db,
419 share->table_name))
420 {
421 do_event_op= 0;
422 }
423
424 {
425 int i, no_nodes= g_ndb_cluster_connection->no_db_nodes();
426 share->subscriber_bitmap= (MY_BITMAP*)
427 alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP));
428 for (i= 0; i < no_nodes; i++)
429 {
430 bitmap_init(&share->subscriber_bitmap[i],
431 (Uint32*)alloc_root(mem_root, max_ndb_nodes/8),
432 max_ndb_nodes, FALSE);
433 bitmap_clear_all(&share->subscriber_bitmap[i]);
434 }
435 }
436
437 if (!do_event_op)
438 {
439 if (_table)
440 {
441 if (_table->s->primary_key == MAX_KEY)
442 share->flags|= NSF_HIDDEN_PK;
443 if (_table->s->blob_fields != 0)
444 share->flags|= NSF_BLOB_FLAG;
445 }
446 else
447 {
448 share->flags|= NSF_NO_BINLOG;
449 }
450 DBUG_RETURN(error);
451 }
452 while (1)
453 {
454 if ((error= ndb_binlog_open_shadow_table(thd, share)))
455 break;
456 if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
457 share->flags|= NSF_HIDDEN_PK;
458 if (share->event_data->shadow_table->s->blob_fields != 0)
459 share->flags|= NSF_BLOB_FLAG;
460 break;
461 }
462 DBUG_RETURN(error);
463 }
464
465 /*****************************************************************
466 functions called from master sql client threads
467 ****************************************************************/
468
469 /*
470 called in mysql_show_binlog_events and reset_logs to make sure we wait for
471 all events originating from this mysql server to arrive in the binlog
472
473 Wait for the last epoch in which the last transaction is a part of.
474
475 Wait a maximum of 30 seconds.
476 */
ndbcluster_binlog_wait(THD * thd)477 static void ndbcluster_binlog_wait(THD *thd)
478 {
479 if (ndb_binlog_running)
480 {
481 DBUG_ENTER("ndbcluster_binlog_wait");
482 ulonglong wait_epoch= ndb_get_latest_trans_gci();
483 /*
484 cluster not connected or no transactions done
485 so nothing to wait for
486 */
487 if (!wait_epoch)
488 DBUG_VOID_RETURN;
489 const char *save_info= thd ? thd->proc_info : 0;
490 int count= 30;
491 if (thd)
492 thd->proc_info= "Waiting for ndbcluster binlog update to "
493 "reach current position";
494 pthread_mutex_lock(&injector_mutex);
495 while (!(thd && thd->killed) && count && ndb_binlog_running &&
496 (ndb_latest_handled_binlog_epoch == 0 ||
497 ndb_latest_handled_binlog_epoch < wait_epoch))
498 {
499 count--;
500 struct timespec abstime;
501 set_timespec(abstime, 1);
502 pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
503 }
504 pthread_mutex_unlock(&injector_mutex);
505 if (thd)
506 thd->proc_info= save_info;
507 DBUG_VOID_RETURN;
508 }
509 }
510
511 /*
512 Called from MYSQL_BIN_LOG::reset_logs in log.cc when binlog is emptied
513 */
ndbcluster_reset_logs(THD * thd)514 static int ndbcluster_reset_logs(THD *thd)
515 {
516 if (!ndb_binlog_running)
517 return 0;
518
519 /* only reset master should reset logs */
520 if (!((thd->lex->sql_command == SQLCOM_RESET) &&
521 (thd->lex->type & REFRESH_MASTER)))
522 return 0;
523
524 DBUG_ENTER("ndbcluster_reset_logs");
525
526 /*
527 Wait for all events originating from this mysql server has
528 reached the binlog before continuing to reset
529 */
530 ndbcluster_binlog_wait(thd);
531
532 /*
533 Truncate mysql.ndb_binlog_index table, if table does not
534 exist ignore the error as it is a "consistent" behavior
535 */
536 Ndb_local_connection mysqld(thd);
537 const bool ignore_no_such_table = true;
538 if(mysqld.truncate_table(STRING_WITH_LEN("mysql"),
539 STRING_WITH_LEN("ndb_binlog_index"),
540 ignore_no_such_table))
541 {
542 // Failed to truncate table
543 DBUG_RETURN(1);
544 }
545 DBUG_RETURN(0);
546 }
547
548 /*
549 Setup THD object
550 'Inspired' from ha_ndbcluster.cc : ndb_util_thread_func
551 */
552 static THD *
setup_thd(char * stackptr)553 setup_thd(char * stackptr)
554 {
555 DBUG_ENTER("setup_thd");
556 THD * thd= new THD; /* note that contructor of THD uses DBUG_ */
557 if (thd == 0)
558 {
559 DBUG_RETURN(0);
560 }
561 THD_CHECK_SENTRY(thd);
562
563 thd->thread_id= 0;
564 thd->thread_stack= stackptr; /* remember where our stack is */
565 if (thd->store_globals())
566 {
567 thd->cleanup();
568 delete thd;
569 DBUG_RETURN(0);
570 }
571
572 lex_start(thd);
573
574 thd->init_for_queries();
575 thd_set_command(thd, COM_DAEMON);
576 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
577 #ifndef NDB_THD_HAS_NO_VERSION
578 thd->version= refresh_version;
579 #endif
580 thd->client_capabilities= 0;
581 thd->lex->start_transaction_opt= 0;
582 thd->security_ctx->skip_grants();
583
584 CHARSET_INFO *charset_connection= get_charset_by_csname("utf8",
585 MY_CS_PRIMARY,
586 MYF(MY_WME));
587 thd->variables.character_set_client= charset_connection;
588 thd->variables.character_set_results= charset_connection;
589 thd->variables.collation_connection= charset_connection;
590 thd->update_charset();
591 DBUG_RETURN(thd);
592 }
593
594 /*
595 Called from MYSQL_BIN_LOG::purge_logs in log.cc when the binlog "file"
596 is removed
597 */
598
599 static int
ndbcluster_binlog_index_purge_file(THD * thd,const char * file)600 ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
601 {
602 int error = 0;
603 THD * save_thd= thd;
604 DBUG_ENTER("ndbcluster_binlog_index_purge_file");
605 DBUG_PRINT("enter", ("file: %s", file));
606
607 if (!ndb_binlog_running || (thd && thd->slave_thread))
608 DBUG_RETURN(0);
609
610 /**
611 * This function really really needs a THD object,
612 * new/delete one if not available...yuck!
613 */
614 if (thd == 0)
615 {
616 if ((thd = setup_thd((char*)&save_thd)) == 0)
617 {
618 /**
619 * TODO return proper error code here,
620 * BUT! return code is not (currently) checked in
621 * log.cc : purge_index_entry() so we settle for warning printout
622 */
623 sql_print_warning("NDB: Unable to purge "
624 NDB_REP_DB "." NDB_REP_TABLE
625 " File=%s (failed to setup thd)", file);
626 DBUG_RETURN(0);
627 }
628 }
629
630 /*
631 delete rows from mysql.ndb_binlog_index table for the given
632 filename, if table does not exist ignore the error as it
633 is a "consistent" behavior
634 */
635 Ndb_local_connection mysqld(thd);
636 const bool ignore_no_such_table = true;
637 if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
638 STRING_WITH_LEN("ndb_binlog_index"),
639 ignore_no_such_table,
640 "File='", file, "'", NULL))
641 {
642 // Failed to delete rows from table
643 error = 1;
644 }
645
646 if (save_thd == 0)
647 {
648 thd->cleanup();
649 delete thd;
650 }
651
652 DBUG_RETURN(error);
653 }
654
655
656 #ifndef NDB_WITHOUT_DIST_PRIV
657 // Determine if privilege tables are distributed, ie. stored in NDB
658 static bool
priv_tables_are_in_ndb(THD * thd)659 priv_tables_are_in_ndb(THD *thd)
660 {
661 bool distributed= false;
662 Ndb_dist_priv_util dist_priv;
663 DBUG_ENTER("ndbcluster_distributed_privileges");
664
665 Ndb *ndb= check_ndb_in_thd(thd);
666 if (!ndb)
667 DBUG_RETURN(false); // MAGNUS, error message?
668
669 if (ndb->setDatabaseName(dist_priv.database()) != 0)
670 DBUG_RETURN(false);
671
672 const char* table_name;
673 while((table_name= dist_priv.iter_next_table()))
674 {
675 DBUG_PRINT("info", ("table_name: %s", table_name));
676 Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name);
677 const NDBTAB *ndbtab= ndbtab_g.get_table();
678 if (ndbtab)
679 {
680 distributed= true;
681 }
682 else if (distributed)
683 {
684 sql_print_error("NDB: Inconsistency detected in distributed "
685 "privilege tables. Table '%s.%s' is not distributed",
686 dist_priv.database(), table_name);
687 DBUG_RETURN(false);
688 }
689 }
690 DBUG_RETURN(distributed);
691 }
692 #endif
693
694 static void
ndbcluster_binlog_log_query(handlerton * hton,THD * thd,enum_binlog_command binlog_command,const char * query,uint query_length,const char * db,const char * table_name)695 ndbcluster_binlog_log_query(handlerton *hton, THD *thd, enum_binlog_command binlog_command,
696 const char *query, uint query_length,
697 const char *db, const char *table_name)
698 {
699 DBUG_ENTER("ndbcluster_binlog_log_query");
700 DBUG_PRINT("enter", ("db: %s table_name: %s query: %s",
701 db, table_name, query));
702 enum SCHEMA_OP_TYPE type;
703 int log= 0;
704 uint32 table_id= 0, table_version= 0;
705 /*
706 Since databases aren't real ndb schema object
707 they don't have any id/version
708
709 But since that id/version is used to make sure that event's on SCHEMA_TABLE
710 is correct, we set random numbers
711 */
712 table_id = (uint32)rand();
713 table_version = (uint32)rand();
714 switch (binlog_command)
715 {
716 case LOGCOM_CREATE_TABLE:
717 type= SOT_CREATE_TABLE;
718 DBUG_ASSERT(FALSE);
719 break;
720 case LOGCOM_ALTER_TABLE:
721 type= SOT_ALTER_TABLE_COMMIT;
722 //log= 1;
723 break;
724 case LOGCOM_RENAME_TABLE:
725 type= SOT_RENAME_TABLE;
726 DBUG_ASSERT(FALSE);
727 break;
728 case LOGCOM_DROP_TABLE:
729 type= SOT_DROP_TABLE;
730 DBUG_ASSERT(FALSE);
731 break;
732 case LOGCOM_CREATE_DB:
733 type= SOT_CREATE_DB;
734 log= 1;
735 break;
736 case LOGCOM_ALTER_DB:
737 type= SOT_ALTER_DB;
738 log= 1;
739 break;
740 case LOGCOM_DROP_DB:
741 type= SOT_DROP_DB;
742 DBUG_ASSERT(FALSE);
743 break;
744 #ifndef NDB_WITHOUT_DIST_PRIV
745 case LOGCOM_CREATE_USER:
746 type= SOT_CREATE_USER;
747 if (priv_tables_are_in_ndb(thd))
748 {
749 DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
750 log= 1;
751 }
752 break;
753 case LOGCOM_DROP_USER:
754 type= SOT_DROP_USER;
755 if (priv_tables_are_in_ndb(thd))
756 {
757 DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
758 log= 1;
759 }
760 break;
761 case LOGCOM_RENAME_USER:
762 type= SOT_RENAME_USER;
763 if (priv_tables_are_in_ndb(thd))
764 {
765 DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
766 log= 1;
767 }
768 break;
769 case LOGCOM_GRANT:
770 type= SOT_GRANT;
771 if (priv_tables_are_in_ndb(thd))
772 {
773 DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
774 log= 1;
775 }
776 break;
777 case LOGCOM_REVOKE:
778 type= SOT_REVOKE;
779 if (priv_tables_are_in_ndb(thd))
780 {
781 DBUG_PRINT("info", ("Privilege tables have been distributed, logging statement"));
782 log= 1;
783 }
784 break;
785 #endif
786 }
787 if (log)
788 {
789 ndbcluster_log_schema_op(thd, query, query_length,
790 db, table_name, table_id, table_version, type,
791 NULL, NULL);
792 }
793 DBUG_VOID_RETURN;
794 }
795
796
797 /*
798 End use of the NDB Cluster binlog
799 - wait for binlog thread to shutdown
800 */
801
ndbcluster_binlog_end(THD * thd)802 int ndbcluster_binlog_end(THD *thd)
803 {
804 DBUG_ENTER("ndbcluster_binlog_end");
805
806 if (ndb_util_thread_running > 0)
807 {
808 /*
809 Wait for util thread to die (as this uses the injector mutex)
810 There is a very small change that ndb_util_thread dies and the
811 following mutex is freed before it's accessed. This shouldn't
812 however be a likely case as the ndbcluster_binlog_end is supposed to
813 be called before ndb_cluster_end().
814 */
815 sql_print_information("Stopping Cluster Utility thread");
816 pthread_mutex_lock(&LOCK_ndb_util_thread);
817 /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
818 ndb_util_thread_running++;
819 ndbcluster_terminating= 1;
820 pthread_cond_signal(&COND_ndb_util_thread);
821 while (ndb_util_thread_running > 1)
822 pthread_cond_wait(&COND_ndb_util_ready, &LOCK_ndb_util_thread);
823 ndb_util_thread_running--;
824 pthread_mutex_unlock(&LOCK_ndb_util_thread);
825 }
826
827 if (ndb_index_stat_thread_running > 0)
828 {
829 /*
830 Index stats thread blindly imitates util thread. Following actually
831 fixes some "[Warning] Plugin 'ndbcluster' will be forced to shutdown".
832 */
833 sql_print_information("Stopping Cluster Index Stats thread");
834 pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
835 /* Ensure mutex are not freed if ndb_cluster_end is running at same time */
836 ndb_index_stat_thread_running++;
837 ndbcluster_terminating= 1;
838 pthread_cond_signal(&COND_ndb_index_stat_thread);
839 while (ndb_index_stat_thread_running > 1)
840 pthread_cond_wait(&COND_ndb_index_stat_ready, &LOCK_ndb_index_stat_thread);
841 ndb_index_stat_thread_running--;
842 pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
843 }
844
845 if (ndbcluster_binlog_inited)
846 {
847 ndbcluster_binlog_inited= 0;
848 if (ndb_binlog_thread_running)
849 {
850 /* wait for injector thread to finish */
851 ndbcluster_binlog_terminating= 1;
852 pthread_mutex_lock(&injector_mutex);
853 pthread_cond_signal(&injector_cond);
854 while (ndb_binlog_thread_running > 0)
855 pthread_cond_wait(&injector_cond, &injector_mutex);
856 pthread_mutex_unlock(&injector_mutex);
857 }
858 pthread_mutex_destroy(&injector_mutex);
859 pthread_cond_destroy(&injector_cond);
860 pthread_mutex_destroy(&ndb_schema_share_mutex);
861 }
862
863 DBUG_RETURN(0);
864 }
865
866 /*****************************************************************
867 functions called from slave sql client threads
868 ****************************************************************/
ndbcluster_reset_slave(THD * thd)869 static void ndbcluster_reset_slave(THD *thd)
870 {
871 int error = 0;
872 if (!ndb_binlog_running)
873 return;
874
875 DBUG_ENTER("ndbcluster_reset_slave");
876
877 /*
878 delete all rows from mysql.ndb_apply_status table
879 - if table does not exist ignore the error as it
880 is a consistent behavior
881 */
882 Ndb_local_connection mysqld(thd);
883 const bool ignore_no_such_table = true;
884 if(mysqld.delete_rows(STRING_WITH_LEN("mysql"),
885 STRING_WITH_LEN("ndb_apply_status"),
886 ignore_no_such_table,
887 NULL))
888 {
889 // Failed to delete rows from table
890 error = 1;
891 }
892
893 g_ndb_slave_state.atResetSlave();
894
895 // pending fix for bug#59844 will make this function return int
896 DBUG_VOID_RETURN;
897 }
898
899 /*
900 Initialize the binlog part of the ndb handlerton
901 */
902
903 /**
904 Upon the sql command flush logs, we need to ensure that all outstanding
905 ndb data to be logged has made it to the binary log to get a deterministic
906 behavior on the rotation of the log.
907 */
ndbcluster_flush_logs(handlerton * hton)908 static bool ndbcluster_flush_logs(handlerton *hton)
909 {
910 ndbcluster_binlog_wait(current_thd);
911 return FALSE;
912 }
913
914
ndbcluster_binlog_func(handlerton * hton,THD * thd,enum_binlog_func fn,void * arg)915 static int ndbcluster_binlog_func(handlerton *hton, THD *thd,
916 enum_binlog_func fn,
917 void *arg)
918 {
919 DBUG_ENTER("ndbcluster_binlog_func");
920 int res= 0;
921 switch(fn)
922 {
923 case BFN_RESET_LOGS:
924 res= ndbcluster_reset_logs(thd);
925 break;
926 case BFN_RESET_SLAVE:
927 ndbcluster_reset_slave(thd);
928 break;
929 case BFN_BINLOG_WAIT:
930 ndbcluster_binlog_wait(thd);
931 break;
932 case BFN_BINLOG_END:
933 res= ndbcluster_binlog_end(thd);
934 break;
935 case BFN_BINLOG_PURGE_FILE:
936 res= ndbcluster_binlog_index_purge_file(thd, (const char *)arg);
937 break;
938 }
939 DBUG_RETURN(res);
940 }
941
ndbcluster_binlog_init_handlerton()942 void ndbcluster_binlog_init_handlerton()
943 {
944 handlerton *h= ndbcluster_hton;
945 h->flush_logs= ndbcluster_flush_logs;
946 h->binlog_func= ndbcluster_binlog_func;
947 h->binlog_log_query= ndbcluster_binlog_log_query;
948 }
949
950
951 /*
952 Convert db and table name into a key to use for searching
953 the ndbcluster_open_tables hash
954 */
955 static size_t
ndb_open_tables__create_key(char * key_buf,size_t key_buf_length,const char * db,size_t db_length,const char * table,size_t table_length)956 ndb_open_tables__create_key(char* key_buf, size_t key_buf_length,
957 const char* db, size_t db_length,
958 const char* table, size_t table_length)
959 {
960 size_t key_length = my_snprintf(key_buf, key_buf_length,
961 "./%*s/%*s", db_length, db,
962 table_length, table) - 1;
963 assert(key_length > 0);
964 assert(key_length < key_buf_length);
965
966 return key_length;
967 }
968
969
970 /*
971 Check if table with given name is open, ie. is
972 in ndbcluster_open_tables hash
973 */
974 static bool
ndb_open_tables__is_table_open(const char * db,size_t db_length,const char * table,size_t table_length)975 ndb_open_tables__is_table_open(const char* db, size_t db_length,
976 const char* table, size_t table_length)
977 {
978 char key[FN_REFLEN + 1];
979 size_t key_length = ndb_open_tables__create_key(key, sizeof(key),
980 db, db_length,
981 table, table_length);
982 DBUG_ENTER("ndb_open_tables__is_table_open");
983 DBUG_PRINT("enter", ("db: '%s', table: '%s', key: '%s'",
984 db, table, key));
985
986 pthread_mutex_lock(&ndbcluster_mutex);
987 bool result = my_hash_search(&ndbcluster_open_tables,
988 (const uchar*)key,
989 key_length) != NULL;
990 pthread_mutex_unlock(&ndbcluster_mutex);
991
992 DBUG_PRINT("exit", ("result: %d", result));
993 DBUG_RETURN(result);
994 }
995
996
997 static bool
ndbcluster_check_ndb_schema_share()998 ndbcluster_check_ndb_schema_share()
999 {
1000 return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
1001 STRING_WITH_LEN("ndb_schema"));
1002 }
1003
1004
1005 static bool
ndbcluster_check_ndb_apply_status_share()1006 ndbcluster_check_ndb_apply_status_share()
1007 {
1008 return ndb_open_tables__is_table_open(STRING_WITH_LEN("mysql"),
1009 STRING_WITH_LEN("ndb_apply_status"));
1010 }
1011
1012
1013 static bool
create_cluster_sys_table(THD * thd,const char * db,size_t db_length,const char * table,size_t table_length,const char * create_definitions,const char * create_options)1014 create_cluster_sys_table(THD *thd, const char* db, size_t db_length,
1015 const char* table, size_t table_length,
1016 const char* create_definitions,
1017 const char* create_options)
1018 {
1019 if (ndb_open_tables__is_table_open(db, db_length, table, table_length))
1020 return false;
1021
1022 if (g_ndb_cluster_connection->get_no_ready() <= 0)
1023 return false;
1024
1025 if (opt_ndb_extra_logging)
1026 sql_print_information("NDB: Creating %s.%s", db, table);
1027
1028 Ndb_local_connection mysqld(thd);
1029
1030 /*
1031 Check if table exists in MySQL "dictionary"(i.e on disk)
1032 if so, remove it since there is none in Ndb
1033 */
1034 {
1035 char path[FN_REFLEN + 1];
1036 build_table_filename(path, sizeof(path) - 1,
1037 db, table, reg_ext, 0);
1038 if (my_delete(path, MYF(0)) == 0)
1039 {
1040 /*
1041 The .frm file existed and was deleted from disk.
1042 It's possible that someone has tried to use it and thus
1043 it might have been inserted in the table definition cache.
1044 It must be flushed to avoid that it exist only in the
1045 table definition cache.
1046 */
1047 if (opt_ndb_extra_logging)
1048 sql_print_information("NDB: Flushing %s.%s", db, table);
1049
1050 /* Flush mysql.ndb_apply_status table, ignore all errors */
1051 (void)mysqld.flush_table(db, db_length,
1052 table, table_length);
1053 }
1054 }
1055
1056 const bool create_if_not_exists = true;
1057 const bool res = mysqld.create_sys_table(db, db_length,
1058 table, table_length,
1059 create_if_not_exists,
1060 create_definitions,
1061 create_options);
1062 return res;
1063 }
1064
1065
1066 static bool
ndb_apply_table__create(THD * thd)1067 ndb_apply_table__create(THD *thd)
1068 {
1069 DBUG_ENTER("ndb_apply_table__create");
1070
1071 /* NOTE! Updating this table schema must be reflected in ndb_restore */
1072 const bool res =
1073 create_cluster_sys_table(thd,
1074 STRING_WITH_LEN("mysql"),
1075 STRING_WITH_LEN("ndb_apply_status"),
1076 // table_definition
1077 "server_id INT UNSIGNED NOT NULL,"
1078 "epoch BIGINT UNSIGNED NOT NULL, "
1079 "log_name VARCHAR(255) BINARY NOT NULL, "
1080 "start_pos BIGINT UNSIGNED NOT NULL, "
1081 "end_pos BIGINT UNSIGNED NOT NULL, "
1082 "PRIMARY KEY USING HASH (server_id)",
1083 // table_options
1084 "ENGINE=NDB CHARACTER SET latin1");
1085 DBUG_RETURN(res);
1086 }
1087
1088
1089 static bool
ndb_schema_table__create(THD * thd)1090 ndb_schema_table__create(THD *thd)
1091 {
1092 DBUG_ENTER("ndb_schema_table__create");
1093
1094 /* NOTE! Updating this table schema must be reflected in ndb_restore */
1095 const bool res =
1096 create_cluster_sys_table(thd,
1097 STRING_WITH_LEN("mysql"),
1098 STRING_WITH_LEN("ndb_schema"),
1099 // table_definition
1100 "db VARBINARY(63) NOT NULL,"
1101 "name VARBINARY(63) NOT NULL,"
1102 "slock BINARY(32) NOT NULL,"
1103 "query BLOB NOT NULL,"
1104 "node_id INT UNSIGNED NOT NULL,"
1105 "epoch BIGINT UNSIGNED NOT NULL,"
1106 "id INT UNSIGNED NOT NULL,"
1107 "version INT UNSIGNED NOT NULL,"
1108 "type INT UNSIGNED NOT NULL,"
1109 "PRIMARY KEY USING HASH (db,name)",
1110 // table_options
1111 "ENGINE=NDB CHARACTER SET latin1");
1112 DBUG_RETURN(res);
1113 }
1114
1115 class Thd_ndb_options_guard
1116 {
1117 public:
Thd_ndb_options_guard(Thd_ndb * thd_ndb)1118 Thd_ndb_options_guard(Thd_ndb *thd_ndb)
1119 : m_val(thd_ndb->options), m_save_val(thd_ndb->options) {}
~Thd_ndb_options_guard()1120 ~Thd_ndb_options_guard() { m_val= m_save_val; }
set(uint32 flag)1121 void set(uint32 flag) { m_val|= flag; }
1122 private:
1123 uint32 &m_val;
1124 uint32 m_save_val;
1125 };
1126
1127 extern int ndb_setup_complete;
1128 extern pthread_cond_t COND_ndb_setup_complete;
1129
1130 /*
1131 ndb_notify_tables_writable
1132
1133 Called to notify any waiting threads that Ndb tables are
1134 now writable
1135 */
ndb_notify_tables_writable()1136 static void ndb_notify_tables_writable()
1137 {
1138 pthread_mutex_lock(&ndbcluster_mutex);
1139 ndb_setup_complete= 1;
1140 pthread_cond_broadcast(&COND_ndb_setup_complete);
1141 pthread_mutex_unlock(&ndbcluster_mutex);
1142 }
1143
1144 /*
1145 Ndb has no representation of the database schema objects.
1146 The mysql.ndb_schema table contains the latest schema operations
1147 done via a mysqld, and thus reflects databases created/dropped/altered
1148 while a mysqld was disconnected. This function tries to recover
1149 the correct state w.r.t created databases using the information in
1150 that table.
1151
1152
1153 */
ndbcluster_find_all_databases(THD * thd)1154 static int ndbcluster_find_all_databases(THD *thd)
1155 {
1156 Ndb *ndb= check_ndb_in_thd(thd);
1157 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1158 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
1159 NDBDICT *dict= ndb->getDictionary();
1160 NdbTransaction *trans= NULL;
1161 NdbError ndb_error;
1162 int retries= 100;
1163 int retry_sleep= 30; /* 30 milliseconds, transaction */
1164 DBUG_ENTER("ndbcluster_find_all_databases");
1165
1166 /*
1167 Function should only be called while ndbcluster_global_schema_lock
1168 is held, to ensure that ndb_schema table is not being updated while
1169 scanning.
1170 */
1171 if (!thd_ndb->has_required_global_schema_lock("ndbcluster_find_all_databases"))
1172 DBUG_RETURN(1);
1173
1174 ndb->setDatabaseName(NDB_REP_DB);
1175 thd_ndb_options.set(TNO_NO_LOG_SCHEMA_OP);
1176 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
1177 while (1)
1178 {
1179 char db_buffer[FN_REFLEN];
1180 char *db= db_buffer+1;
1181 char name[FN_REFLEN];
1182 char query[64000];
1183 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1184 const NDBTAB *ndbtab= ndbtab_g.get_table();
1185 NdbScanOperation *op;
1186 NdbBlob *query_blob_handle;
1187 int r= 0;
1188 if (ndbtab == NULL)
1189 {
1190 ndb_error= dict->getNdbError();
1191 goto error;
1192 }
1193 trans= ndb->startTransaction();
1194 if (trans == NULL)
1195 {
1196 ndb_error= ndb->getNdbError();
1197 goto error;
1198 }
1199 op= trans->getNdbScanOperation(ndbtab);
1200 if (op == NULL)
1201 {
1202 ndb_error= trans->getNdbError();
1203 goto error;
1204 }
1205
1206 op->readTuples(NdbScanOperation::LM_Read,
1207 NdbScanOperation::SF_TupScan, 1);
1208
1209 r|= op->getValue("db", db_buffer) == NULL;
1210 r|= op->getValue("name", name) == NULL;
1211 r|= (query_blob_handle= op->getBlobHandle("query")) == NULL;
1212 r|= query_blob_handle->getValue(query, sizeof(query));
1213
1214 if (r)
1215 {
1216 ndb_error= op->getNdbError();
1217 goto error;
1218 }
1219
1220 if (trans->execute(NdbTransaction::NoCommit))
1221 {
1222 ndb_error= trans->getNdbError();
1223 goto error;
1224 }
1225
1226 while ((r= op->nextResult()) == 0)
1227 {
1228 unsigned db_len= db_buffer[0];
1229 unsigned name_len= name[0];
1230 /*
1231 name_len == 0 means no table name, hence the row
1232 is for a database
1233 */
1234 if (db_len > 0 && name_len == 0)
1235 {
1236 /* database found */
1237 db[db_len]= 0;
1238
1239 /* find query */
1240 Uint64 query_length= 0;
1241 if (query_blob_handle->getLength(query_length))
1242 {
1243 ndb_error= query_blob_handle->getNdbError();
1244 goto error;
1245 }
1246 query[query_length]= 0;
1247 build_table_filename(name, sizeof(name), db, "", "", 0);
1248 int database_exists= !my_access(name, F_OK);
1249 if (strncasecmp("CREATE", query, 6) == 0)
1250 {
1251 /* Database should exist */
1252 if (!database_exists)
1253 {
1254 /* create missing database */
1255 sql_print_information("NDB: Discovered missing database '%s'", db);
1256 const int no_print_error[1]= {0};
1257 run_query(thd, query, query + query_length,
1258 no_print_error);
1259 }
1260 }
1261 else if (strncasecmp("ALTER", query, 5) == 0)
1262 {
1263 /* Database should exist */
1264 if (!database_exists)
1265 {
1266 /* create missing database */
1267 sql_print_information("NDB: Discovered missing database '%s'", db);
1268 const int no_print_error[1]= {0};
1269 name_len= my_snprintf(name, sizeof(name), "CREATE DATABASE %s", db);
1270 run_query(thd, name, name + name_len,
1271 no_print_error);
1272 run_query(thd, query, query + query_length,
1273 no_print_error);
1274 }
1275 }
1276 else if (strncasecmp("DROP", query, 4) == 0)
1277 {
1278 /* Database should not exist */
1279 if (database_exists)
1280 {
1281 /* drop missing database */
1282 sql_print_information("NDB: Discovered reamining database '%s'", db);
1283 }
1284 }
1285 }
1286 }
1287 if (r == -1)
1288 {
1289 ndb_error= op->getNdbError();
1290 goto error;
1291 }
1292 ndb->closeTransaction(trans);
1293 trans= NULL;
1294 DBUG_RETURN(0); // success
1295 error:
1296 if (trans)
1297 {
1298 ndb->closeTransaction(trans);
1299 trans= NULL;
1300 }
1301 if (ndb_error.status == NdbError::TemporaryError && !thd->killed)
1302 {
1303 if (retries--)
1304 {
1305 sql_print_warning("NDB: ndbcluster_find_all_databases retry: %u - %s",
1306 ndb_error.code,
1307 ndb_error.message);
1308 do_retry_sleep(retry_sleep);
1309 continue; // retry
1310 }
1311 }
1312 if (!thd->killed)
1313 {
1314 sql_print_error("NDB: ndbcluster_find_all_databases fail: %u - %s",
1315 ndb_error.code,
1316 ndb_error.message);
1317 }
1318
1319 DBUG_RETURN(1); // not temp error or too many retries
1320 }
1321 }
1322
1323 bool
ndb_binlog_setup(THD * thd)1324 ndb_binlog_setup(THD *thd)
1325 {
1326 if (ndb_binlog_tables_inited)
1327 return true; // Already setup -> OK
1328
1329 Ndb_global_schema_lock_guard global_schema_lock_guard(thd);
1330 if (global_schema_lock_guard.lock(false, false))
1331 return false;
1332 if (!ndb_schema_share &&
1333 ndbcluster_check_ndb_schema_share() == 0)
1334 {
1335 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_SCHEMA_TABLE);
1336 if (!ndb_schema_share)
1337 {
1338 ndb_schema_table__create(thd);
1339 // always make sure we create the 'schema' first
1340 if (!ndb_schema_share)
1341 return false;
1342 }
1343 }
1344 if (!ndb_apply_status_share &&
1345 ndbcluster_check_ndb_apply_status_share() == 0)
1346 {
1347 ndb_create_table_from_engine(thd, NDB_REP_DB, NDB_APPLY_TABLE);
1348 if (!ndb_apply_status_share)
1349 {
1350 ndb_apply_table__create(thd);
1351 if (!ndb_apply_status_share)
1352 return false;
1353 }
1354 }
1355
1356 if (ndbcluster_find_all_databases(thd))
1357 {
1358 return false;
1359 }
1360
1361 if (!ndbcluster_find_all_files(thd))
1362 {
1363 ndb_binlog_tables_inited= TRUE;
1364 if (ndb_binlog_tables_inited &&
1365 ndb_binlog_running && ndb_binlog_is_ready)
1366 {
1367 if (opt_ndb_extra_logging)
1368 sql_print_information("NDB Binlog: ndb tables writable");
1369 close_cached_tables(NULL, NULL, TRUE, FALSE, FALSE);
1370
1371 /*
1372 Signal any waiting thread that ndb table setup is
1373 now complete
1374 */
1375 ndb_notify_tables_writable();
1376 }
1377 /* Signal injector thread that all is setup */
1378 pthread_cond_signal(&injector_cond);
1379
1380 return true; // Setup completed -> OK
1381 }
1382 return false;
1383 }
1384
1385 /*
1386 Defines and struct for schema table.
1387 Should reflect table definition above.
1388 */
1389 #define SCHEMA_DB_I 0u
1390 #define SCHEMA_NAME_I 1u
1391 #define SCHEMA_SLOCK_I 2u
1392 #define SCHEMA_QUERY_I 3u
1393 #define SCHEMA_NODE_ID_I 4u
1394 #define SCHEMA_EPOCH_I 5u
1395 #define SCHEMA_ID_I 6u
1396 #define SCHEMA_VERSION_I 7u
1397 #define SCHEMA_TYPE_I 8u
1398 #define SCHEMA_SIZE 9u
1399 #define SCHEMA_SLOCK_SIZE 32u
1400
1401 struct Cluster_schema
1402 {
1403 uchar db_length;
1404 char db[64];
1405 uchar name_length;
1406 char name[64];
1407 uchar slock_length;
1408 uint32 slock[SCHEMA_SLOCK_SIZE/4];
1409 unsigned short query_length;
1410 char *query;
1411 Uint64 epoch;
1412 uint32 node_id;
1413 uint32 id;
1414 uint32 version;
1415 uint32 type;
1416 uint32 any_value;
1417 };
1418
1419 static void
print_could_not_discover_error(THD * thd,const Cluster_schema * schema)1420 print_could_not_discover_error(THD *thd,
1421 const Cluster_schema *schema)
1422 {
1423 sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
1424 "binlog schema event '%s' from node %d. "
1425 "my_errno: %d",
1426 schema->db, schema->name, schema->query,
1427 schema->node_id, my_errno);
1428 print_warning_list("NDB Binlog", thd);
1429 }
1430
1431
1432 /*
1433 Transfer schema table data into corresponding struct
1434 */
ndbcluster_get_schema(Ndb_event_data * event_data,Cluster_schema * s)1435 static void ndbcluster_get_schema(Ndb_event_data *event_data,
1436 Cluster_schema *s)
1437 {
1438 TABLE *table= event_data->shadow_table;
1439 Field **field;
1440 /* unpack blob values */
1441 uchar* blobs_buffer= 0;
1442 uint blobs_buffer_size= 0;
1443 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
1444 {
1445 ptrdiff_t ptrdiff= 0;
1446 int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
1447 blobs_buffer, blobs_buffer_size,
1448 ptrdiff);
1449 if (ret != 0)
1450 {
1451 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1452 DBUG_PRINT("info", ("blob read error"));
1453 DBUG_ASSERT(FALSE);
1454 }
1455 }
1456 /* db varchar 1 length uchar */
1457 field= table->field;
1458 s->db_length= *(uint8*)(*field)->ptr;
1459 DBUG_ASSERT(s->db_length <= (*field)->field_length);
1460 DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
1461 memcpy(s->db, (*field)->ptr + 1, s->db_length);
1462 s->db[s->db_length]= 0;
1463 /* name varchar 1 length uchar */
1464 field++;
1465 s->name_length= *(uint8*)(*field)->ptr;
1466 DBUG_ASSERT(s->name_length <= (*field)->field_length);
1467 DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
1468 memcpy(s->name, (*field)->ptr + 1, s->name_length);
1469 s->name[s->name_length]= 0;
1470 /* slock fixed length */
1471 field++;
1472 s->slock_length= (*field)->field_length;
1473 DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
1474 memcpy(s->slock, (*field)->ptr, s->slock_length);
1475 /* query blob */
1476 field++;
1477 {
1478 Field_blob *field_blob= (Field_blob*)(*field);
1479 uint blob_len= field_blob->get_length((*field)->ptr);
1480 uchar *blob_ptr= 0;
1481 field_blob->get_ptr(&blob_ptr);
1482 DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
1483 s->query_length= blob_len;
1484 s->query= sql_strmake((char*) blob_ptr, blob_len);
1485 }
1486 /* node_id */
1487 field++;
1488 s->node_id= (Uint32)((Field_long *)*field)->val_int();
1489 /* epoch */
1490 field++;
1491 s->epoch= ((Field_long *)*field)->val_int();
1492 /* id */
1493 field++;
1494 s->id= (Uint32)((Field_long *)*field)->val_int();
1495 /* version */
1496 field++;
1497 s->version= (Uint32)((Field_long *)*field)->val_int();
1498 /* type */
1499 field++;
1500 s->type= (Uint32)((Field_long *)*field)->val_int();
1501 /* free blobs buffer */
1502 my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
1503 dbug_tmp_restore_column_map(table->read_set, old_map);
1504 }
1505
1506 /*
1507 helper function to pack a ndb varchar
1508 */
ndb_pack_varchar(const NDBCOL * col,char * buf,const char * str,int sz)1509 char *ndb_pack_varchar(const NDBCOL *col, char *buf,
1510 const char *str, int sz)
1511 {
1512 switch (col->getArrayType())
1513 {
1514 case NDBCOL::ArrayTypeFixed:
1515 memcpy(buf, str, sz);
1516 break;
1517 case NDBCOL::ArrayTypeShortVar:
1518 *(uchar*)buf= (uchar)sz;
1519 memcpy(buf + 1, str, sz);
1520 break;
1521 case NDBCOL::ArrayTypeMediumVar:
1522 int2store(buf, sz);
1523 memcpy(buf + 2, str, sz);
1524 break;
1525 }
1526 return buf;
1527 }
1528
1529 /*
1530 acknowledge handling of schema operation
1531 */
1532 static int
ndbcluster_update_slock(THD * thd,const char * db,const char * table_name,uint32 table_id,uint32 table_version)1533 ndbcluster_update_slock(THD *thd,
1534 const char *db,
1535 const char *table_name,
1536 uint32 table_id,
1537 uint32 table_version)
1538 {
1539 DBUG_ENTER("ndbcluster_update_slock");
1540 if (!ndb_schema_share)
1541 {
1542 DBUG_RETURN(0);
1543 }
1544
1545 const NdbError *ndb_error= 0;
1546 uint32 node_id= g_ndb_cluster_connection->node_id();
1547 Ndb *ndb= check_ndb_in_thd(thd);
1548 char save_db[FN_HEADLEN];
1549 strcpy(save_db, ndb->getDatabaseName());
1550
1551 char tmp_buf[FN_REFLEN];
1552 NDBDICT *dict= ndb->getDictionary();
1553 ndb->setDatabaseName(NDB_REP_DB);
1554 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1555 const NDBTAB *ndbtab= ndbtab_g.get_table();
1556 NdbTransaction *trans= 0;
1557 int retries= 100;
1558 int retry_sleep= 30; /* 30 milliseconds, transaction */
1559 const NDBCOL *col[SCHEMA_SIZE];
1560 unsigned sz[SCHEMA_SIZE];
1561
1562 MY_BITMAP slock;
1563 uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
1564 bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
1565
1566 if (ndbtab == 0)
1567 {
1568 if (dict->getNdbError().code != 4009)
1569 abort();
1570 DBUG_RETURN(0);
1571 }
1572
1573 {
1574 uint i;
1575 for (i= 0; i < SCHEMA_SIZE; i++)
1576 {
1577 col[i]= ndbtab->getColumn(i);
1578 if (i != SCHEMA_QUERY_I)
1579 {
1580 sz[i]= col[i]->getLength();
1581 DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
1582 }
1583 }
1584 }
1585
1586 while (1)
1587 {
1588 if ((trans= ndb->startTransaction()) == 0)
1589 goto err;
1590 {
1591 NdbOperation *op= 0;
1592 int r= 0;
1593
1594 /* read the bitmap exlusive */
1595 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1596 DBUG_ASSERT(r == 0);
1597 r|= op->readTupleExclusive();
1598 DBUG_ASSERT(r == 0);
1599
1600 /* db */
1601 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1602 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1603 DBUG_ASSERT(r == 0);
1604 /* name */
1605 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1606 strlen(table_name));
1607 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1608 DBUG_ASSERT(r == 0);
1609 /* slock */
1610 r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
1611 DBUG_ASSERT(r == 0);
1612 }
1613 if (trans->execute(NdbTransaction::NoCommit))
1614 goto err;
1615
1616 if (opt_ndb_extra_logging > 19)
1617 {
1618 uint32 copy[SCHEMA_SLOCK_SIZE/4];
1619 memcpy(copy, bitbuf, sizeof(copy));
1620 bitmap_clear_bit(&slock, node_id);
1621 sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
1622 db, table_name,
1623 table_id, table_version,
1624 copy[0], copy[1],
1625 slock.bitmap[0],
1626 slock.bitmap[1]);
1627 }
1628 else
1629 {
1630 bitmap_clear_bit(&slock, node_id);
1631 }
1632
1633 {
1634 NdbOperation *op= 0;
1635 int r= 0;
1636
1637 /* now update the tuple */
1638 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
1639 DBUG_ASSERT(r == 0);
1640 r|= op->updateTuple();
1641 DBUG_ASSERT(r == 0);
1642
1643 /* db */
1644 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, strlen(db));
1645 r|= op->equal(SCHEMA_DB_I, tmp_buf);
1646 DBUG_ASSERT(r == 0);
1647 /* name */
1648 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
1649 strlen(table_name));
1650 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
1651 DBUG_ASSERT(r == 0);
1652 /* slock */
1653 r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
1654 DBUG_ASSERT(r == 0);
1655 /* node_id */
1656 r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
1657 DBUG_ASSERT(r == 0);
1658 /* type */
1659 r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
1660 DBUG_ASSERT(r == 0);
1661 }
1662 if (trans->execute(NdbTransaction::Commit,
1663 NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
1664 {
1665 DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
1666 node_id, db, table_name));
1667 dict->forceGCPWait(1);
1668 break;
1669 }
1670 err:
1671 const NdbError *this_error= trans ?
1672 &trans->getNdbError() : &ndb->getNdbError();
1673 if (this_error->status == NdbError::TemporaryError && !thd->killed)
1674 {
1675 if (retries--)
1676 {
1677 if (trans)
1678 ndb->closeTransaction(trans);
1679 do_retry_sleep(retry_sleep);
1680 continue; // retry
1681 }
1682 }
1683 ndb_error= this_error;
1684 break;
1685 }
1686
1687 if (ndb_error)
1688 {
1689 char buf[1024];
1690 my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
1691 db, table_name);
1692 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
1693 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
1694 ndb_error->code, ndb_error->message, buf);
1695 }
1696 if (trans)
1697 ndb->closeTransaction(trans);
1698 ndb->setDatabaseName(save_db);
1699 DBUG_RETURN(0);
1700 }
1701
1702
1703 /*
1704 log query in schema table
1705 */
ndb_report_waiting(const char * key,int the_time,const char * op,const char * obj,const MY_BITMAP * map)1706 static void ndb_report_waiting(const char *key,
1707 int the_time,
1708 const char *op,
1709 const char *obj,
1710 const MY_BITMAP * map)
1711 {
1712 ulonglong ndb_latest_epoch= 0;
1713 const char *proc_info= "<no info>";
1714 pthread_mutex_lock(&injector_mutex);
1715 if (injector_ndb)
1716 ndb_latest_epoch= injector_ndb->getLatestGCI();
1717 if (injector_thd)
1718 proc_info= injector_thd->proc_info;
1719 pthread_mutex_unlock(&injector_mutex);
1720 if (map == 0)
1721 {
1722 sql_print_information("NDB %s:"
1723 " waiting max %u sec for %s %s."
1724 " epochs: (%u/%u,%u/%u,%u/%u)"
1725 " injector proc_info: %s"
1726 ,key, the_time, op, obj
1727 ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1728 ,(uint)(ndb_latest_handled_binlog_epoch)
1729 ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1730 ,(uint)(ndb_latest_received_binlog_epoch)
1731 ,(uint)(ndb_latest_epoch >> 32)
1732 ,(uint)(ndb_latest_epoch)
1733 ,proc_info
1734 );
1735 }
1736 else
1737 {
1738 sql_print_information("NDB %s:"
1739 " waiting max %u sec for %s %s."
1740 " epochs: (%u/%u,%u/%u,%u/%u)"
1741 " injector proc_info: %s map: %x%x"
1742 ,key, the_time, op, obj
1743 ,(uint)(ndb_latest_handled_binlog_epoch >> 32)
1744 ,(uint)(ndb_latest_handled_binlog_epoch)
1745 ,(uint)(ndb_latest_received_binlog_epoch >> 32)
1746 ,(uint)(ndb_latest_received_binlog_epoch)
1747 ,(uint)(ndb_latest_epoch >> 32)
1748 ,(uint)(ndb_latest_epoch)
1749 ,proc_info
1750 ,map->bitmap[0]
1751 ,map->bitmap[1]
1752 );
1753 }
1754 }
1755
1756 static
1757 const char*
get_schema_type_name(uint type)1758 get_schema_type_name(uint type)
1759 {
1760 switch(type){
1761 case SOT_DROP_TABLE:
1762 return "DROP_TABLE";
1763 case SOT_CREATE_TABLE:
1764 return "CREATE_TABLE";
1765 case SOT_RENAME_TABLE_NEW:
1766 return "RENAME_TABLE_NEW";
1767 case SOT_ALTER_TABLE_COMMIT:
1768 return "ALTER_TABLE_COMMIT";
1769 case SOT_DROP_DB:
1770 return "DROP_DB";
1771 case SOT_CREATE_DB:
1772 return "CREATE_DB";
1773 case SOT_ALTER_DB:
1774 return "ALTER_DB";
1775 case SOT_CLEAR_SLOCK:
1776 return "CLEAR_SLOCK";
1777 case SOT_TABLESPACE:
1778 return "TABLESPACE";
1779 case SOT_LOGFILE_GROUP:
1780 return "LOGFILE_GROUP";
1781 case SOT_RENAME_TABLE:
1782 return "RENAME_TABLE";
1783 case SOT_TRUNCATE_TABLE:
1784 return "TRUNCATE_TABLE";
1785 case SOT_RENAME_TABLE_PREPARE:
1786 return "RENAME_TABLE_PREPARE";
1787 case SOT_ONLINE_ALTER_TABLE_PREPARE:
1788 return "ONLINE_ALTER_TABLE_PREPARE";
1789 case SOT_ONLINE_ALTER_TABLE_COMMIT:
1790 return "ONLINE_ALTER_TABLE_COMMIT";
1791 case SOT_CREATE_USER:
1792 return "CREATE_USER";
1793 case SOT_DROP_USER:
1794 return "DROP_USER";
1795 case SOT_RENAME_USER:
1796 return "RENAME_USER";
1797 case SOT_GRANT:
1798 return "GRANT";
1799 case SOT_REVOKE:
1800 return "REVOKE";
1801 }
1802 return "<unknown>";
1803 }
1804
1805 extern void update_slave_api_stats(Ndb*);
1806
ndbcluster_log_schema_op(THD * thd,const char * query,int query_length,const char * db,const char * table_name,uint32 ndb_table_id,uint32 ndb_table_version,enum SCHEMA_OP_TYPE type,const char * new_db,const char * new_table_name)1807 int ndbcluster_log_schema_op(THD *thd,
1808 const char *query, int query_length,
1809 const char *db, const char *table_name,
1810 uint32 ndb_table_id,
1811 uint32 ndb_table_version,
1812 enum SCHEMA_OP_TYPE type,
1813 const char *new_db, const char *new_table_name)
1814 {
1815 DBUG_ENTER("ndbcluster_log_schema_op");
1816 Thd_ndb *thd_ndb= get_thd_ndb(thd);
1817 if (!thd_ndb)
1818 {
1819 if (!(thd_ndb= Thd_ndb::seize(thd)))
1820 {
1821 sql_print_error("Could not allocate Thd_ndb object");
1822 DBUG_RETURN(1);
1823 }
1824 thd_set_thd_ndb(thd, thd_ndb);
1825 }
1826
1827 DBUG_PRINT("enter",
1828 ("query: %s db: %s table_name: %s thd_ndb->options: %d",
1829 query, db, table_name, thd_ndb->options));
1830 if (!ndb_schema_share || thd_ndb->options & TNO_NO_LOG_SCHEMA_OP)
1831 {
1832 if (thd->slave_thread)
1833 update_slave_api_stats(thd_ndb->ndb);
1834
1835 DBUG_RETURN(0);
1836 }
1837
1838 char tmp_buf2[FN_REFLEN];
1839 char quoted_table1[2 + 2 * FN_REFLEN + 1];
1840 char quoted_db1[2 + 2 * FN_REFLEN + 1];
1841 char quoted_db2[2 + 2 * FN_REFLEN + 1];
1842 char quoted_table2[2 + 2 * FN_REFLEN + 1];
1843 int id_length= 0;
1844 const char *type_str;
1845 int also_internal= 0;
1846 uint32 log_type= (uint32)type;
1847 switch (type)
1848 {
1849 case SOT_DROP_TABLE:
1850 /* drop database command, do not log at drop table */
1851 if (thd->lex->sql_command == SQLCOM_DROP_DB)
1852 DBUG_RETURN(0);
1853 /* redo the drop table query as is may contain several tables */
1854 query= tmp_buf2;
1855 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1856 table_name, 0);
1857 quoted_table1[id_length]= '\0';
1858 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1859 db, 0);
1860 quoted_db1[id_length]= '\0';
1861 query_length= (uint) (strxmov(tmp_buf2, "drop table ", quoted_db1, ".",
1862 quoted_table1, NullS) - tmp_buf2);
1863 type_str= "drop table";
1864 break;
1865 case SOT_RENAME_TABLE_PREPARE:
1866 type_str= "rename table prepare";
1867 also_internal= 1;
1868 break;
1869 case SOT_RENAME_TABLE:
1870 /* redo the rename table query as is may contain several tables */
1871 query= tmp_buf2;
1872 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db1,
1873 db, 0);
1874 quoted_db1[id_length]= '\0';
1875 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table1,
1876 table_name, 0);
1877 quoted_table1[id_length]= '\0';
1878 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_db2,
1879 new_db, 0);
1880 quoted_db2[id_length]= '\0';
1881 id_length= my_strmov_quoted_identifier (thd, (char *) quoted_table2,
1882 new_table_name, 0);
1883 quoted_table2[id_length]= '\0';
1884 query_length= (uint) (strxmov(tmp_buf2, "rename table ",
1885 quoted_db1, ".", quoted_table1, " to ",
1886 quoted_db2, ".", quoted_table2, NullS) - tmp_buf2);
1887 type_str= "rename table";
1888 break;
1889 case SOT_CREATE_TABLE:
1890 type_str= "create table";
1891 break;
1892 case SOT_ALTER_TABLE_COMMIT:
1893 type_str= "alter table";
1894 also_internal= 1;
1895 break;
1896 case SOT_ONLINE_ALTER_TABLE_PREPARE:
1897 type_str= "online alter table prepare";
1898 also_internal= 1;
1899 break;
1900 case SOT_ONLINE_ALTER_TABLE_COMMIT:
1901 type_str= "online alter table commit";
1902 also_internal= 1;
1903 break;
1904 case SOT_DROP_DB:
1905 type_str= "drop db";
1906 break;
1907 case SOT_CREATE_DB:
1908 type_str= "create db";
1909 break;
1910 case SOT_ALTER_DB:
1911 type_str= "alter db";
1912 break;
1913 case SOT_TABLESPACE:
1914 type_str= "tablespace";
1915 break;
1916 case SOT_LOGFILE_GROUP:
1917 type_str= "logfile group";
1918 break;
1919 case SOT_TRUNCATE_TABLE:
1920 type_str= "truncate table";
1921 break;
1922 case SOT_CREATE_USER:
1923 type_str= "create user";
1924 break;
1925 case SOT_DROP_USER:
1926 type_str= "drop user";
1927 break;
1928 case SOT_RENAME_USER:
1929 type_str= "rename user";
1930 break;
1931 case SOT_GRANT:
1932 type_str= "grant/revoke";
1933 break;
1934 case SOT_REVOKE:
1935 type_str= "revoke all";
1936 break;
1937 default:
1938 abort(); /* should not happen, programming error */
1939 }
1940
1941 NDB_SCHEMA_OBJECT *ndb_schema_object;
1942 {
1943 char key[FN_REFLEN + 1];
1944 build_table_filename(key, sizeof(key) - 1, db, table_name, "", 0);
1945 ndb_schema_object= ndb_get_schema_object(key, TRUE, FALSE);
1946 ndb_schema_object->table_id= ndb_table_id;
1947 ndb_schema_object->table_version= ndb_table_version;
1948 }
1949
1950 const NdbError *ndb_error= 0;
1951 uint32 node_id= g_ndb_cluster_connection->node_id();
1952 Uint64 epoch= 0;
1953 {
1954 int i;
1955 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
1956
1957 /* begin protect ndb_schema_share */
1958 pthread_mutex_lock(&ndb_schema_share_mutex);
1959 if (ndb_schema_share == 0)
1960 {
1961 pthread_mutex_unlock(&ndb_schema_share_mutex);
1962 if (ndb_schema_object)
1963 ndb_free_schema_object(&ndb_schema_object, FALSE);
1964 DBUG_RETURN(0);
1965 }
1966 pthread_mutex_lock(&ndb_schema_share->mutex);
1967 for (i= 0; i < no_storage_nodes; i++)
1968 {
1969 bitmap_union(&ndb_schema_object->slock_bitmap,
1970 &ndb_schema_share->subscriber_bitmap[i]);
1971 }
1972 pthread_mutex_unlock(&ndb_schema_share->mutex);
1973 pthread_mutex_unlock(&ndb_schema_share_mutex);
1974 /* end protect ndb_schema_share */
1975
1976 if (also_internal)
1977 bitmap_set_bit(&ndb_schema_object->slock_bitmap, node_id);
1978 else
1979 bitmap_clear_bit(&ndb_schema_object->slock_bitmap, node_id);
1980
1981 DBUG_DUMP("schema_subscribers", (uchar*)&ndb_schema_object->slock,
1982 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
1983 }
1984
1985 Ndb *ndb= thd_ndb->ndb;
1986 char save_db[FN_REFLEN];
1987 strcpy(save_db, ndb->getDatabaseName());
1988
1989 char tmp_buf[FN_REFLEN];
1990 NDBDICT *dict= ndb->getDictionary();
1991 ndb->setDatabaseName(NDB_REP_DB);
1992 Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
1993 const NDBTAB *ndbtab= ndbtab_g.get_table();
1994 NdbTransaction *trans= 0;
1995 int retries= 100;
1996 int retry_sleep= 30; /* 30 milliseconds, transaction */
1997 const NDBCOL *col[SCHEMA_SIZE];
1998 unsigned sz[SCHEMA_SIZE];
1999
2000 if (ndbtab == 0)
2001 {
2002 if (strcmp(NDB_REP_DB, db) != 0 ||
2003 strcmp(NDB_SCHEMA_TABLE, table_name))
2004 {
2005 ndb_error= &dict->getNdbError();
2006 }
2007 goto end;
2008 }
2009
2010 {
2011 uint i;
2012 for (i= 0; i < SCHEMA_SIZE; i++)
2013 {
2014 col[i]= ndbtab->getColumn(i);
2015 if (i != SCHEMA_QUERY_I)
2016 {
2017 sz[i]= col[i]->getLength();
2018 DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
2019 }
2020 }
2021 }
2022
2023 while (1)
2024 {
2025 const char *log_db= db;
2026 const char *log_tab= table_name;
2027 const char *log_subscribers= (char*)ndb_schema_object->slock;
2028 if ((trans= ndb->startTransaction()) == 0)
2029 goto err;
2030 while (1)
2031 {
2032 NdbOperation *op= 0;
2033 int r= 0;
2034 r|= (op= trans->getNdbOperation(ndbtab)) == 0;
2035 DBUG_ASSERT(r == 0);
2036 r|= op->writeTuple();
2037 DBUG_ASSERT(r == 0);
2038
2039 /* db */
2040 ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, log_db, strlen(log_db));
2041 r|= op->equal(SCHEMA_DB_I, tmp_buf);
2042 DBUG_ASSERT(r == 0);
2043 /* name */
2044 ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, log_tab,
2045 strlen(log_tab));
2046 r|= op->equal(SCHEMA_NAME_I, tmp_buf);
2047 DBUG_ASSERT(r == 0);
2048 /* slock */
2049 DBUG_ASSERT(sz[SCHEMA_SLOCK_I] ==
2050 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2051 r|= op->setValue(SCHEMA_SLOCK_I, log_subscribers);
2052 DBUG_ASSERT(r == 0);
2053 /* query */
2054 {
2055 NdbBlob *ndb_blob= op->getBlobHandle(SCHEMA_QUERY_I);
2056 DBUG_ASSERT(ndb_blob != 0);
2057 uint blob_len= query_length;
2058 const char* blob_ptr= query;
2059 r|= ndb_blob->setValue(blob_ptr, blob_len);
2060 DBUG_ASSERT(r == 0);
2061 }
2062 /* node_id */
2063 r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
2064 DBUG_ASSERT(r == 0);
2065 /* epoch */
2066 r|= op->setValue(SCHEMA_EPOCH_I, epoch);
2067 DBUG_ASSERT(r == 0);
2068 /* id */
2069 r|= op->setValue(SCHEMA_ID_I, ndb_table_id);
2070 DBUG_ASSERT(r == 0);
2071 /* version */
2072 r|= op->setValue(SCHEMA_VERSION_I, ndb_table_version);
2073 DBUG_ASSERT(r == 0);
2074 /* type */
2075 r|= op->setValue(SCHEMA_TYPE_I, log_type);
2076 DBUG_ASSERT(r == 0);
2077 /* any value */
2078 Uint32 anyValue = 0;
2079 if (! thd->slave_thread)
2080 {
2081 /* Schema change originating from this MySQLD, check SQL_LOG_BIN
2082 * variable and pass 'setting' to all logging MySQLDs via AnyValue
2083 */
2084 if (thd_options(thd) & OPTION_BIN_LOG) /* e.g. SQL_LOG_BIN == on */
2085 {
2086 DBUG_PRINT("info", ("Schema event for binlogging"));
2087 ndbcluster_anyvalue_set_normal(anyValue);
2088 }
2089 else
2090 {
2091 DBUG_PRINT("info", ("Schema event not for binlogging"));
2092 ndbcluster_anyvalue_set_nologging(anyValue);
2093 }
2094 }
2095 else
2096 {
2097 /*
2098 Slave propagating replicated schema event in ndb_schema
2099 In case replicated serverId is composite
2100 (server-id-bits < 31) we copy it into the
2101 AnyValue as-is
2102 This is for 'future', as currently Schema operations
2103 do not have composite AnyValues.
2104 In future it may be useful to support *not* mapping composite
2105 AnyValues to/from Binlogged server-ids.
2106 */
2107 DBUG_PRINT("info", ("Replicated schema event with original server id %d",
2108 thd->server_id));
2109 anyValue = thd_unmasked_server_id(thd);
2110 }
2111
2112 #ifndef DBUG_OFF
2113 /*
2114 MySQLD will set the user-portion of AnyValue (if any) to all 1s
2115 This tests code filtering ServerIds on the value of server-id-bits.
2116 */
2117 const char* p = getenv("NDB_TEST_ANYVALUE_USERDATA");
2118 if (p != 0 && *p != 0 && *p != '0' && *p != 'n' && *p != 'N')
2119 {
2120 dbug_ndbcluster_anyvalue_set_userbits(anyValue);
2121 }
2122 #endif
2123
2124 r|= op->setAnyValue(anyValue);
2125 DBUG_ASSERT(r == 0);
2126 break;
2127 }
2128 if (trans->execute(NdbTransaction::Commit, NdbOperation::DefaultAbortOption,
2129 1 /* force send */) == 0)
2130 {
2131 DBUG_PRINT("info", ("logged: %s", query));
2132 dict->forceGCPWait(1);
2133 break;
2134 }
2135 err:
2136 const NdbError *this_error= trans ?
2137 &trans->getNdbError() : &ndb->getNdbError();
2138 if (this_error->status == NdbError::TemporaryError && !thd->killed)
2139 {
2140 if (retries--)
2141 {
2142 if (trans)
2143 ndb->closeTransaction(trans);
2144 do_retry_sleep(retry_sleep);
2145 continue; // retry
2146 }
2147 }
2148 ndb_error= this_error;
2149 break;
2150 }
2151 end:
2152 if (ndb_error)
2153 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
2154 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
2155 ndb_error->code,
2156 ndb_error->message,
2157 "Could not log query '%s' on other mysqld's");
2158
2159 if (trans)
2160 ndb->closeTransaction(trans);
2161 ndb->setDatabaseName(save_db);
2162
2163 if (opt_ndb_extra_logging > 19)
2164 {
2165 sql_print_information("NDB: distributed %s.%s(%u/%u) type: %s(%u) query: \'%s\' to %x%x",
2166 db,
2167 table_name,
2168 ndb_table_id,
2169 ndb_table_version,
2170 get_schema_type_name(log_type),
2171 log_type,
2172 query,
2173 ndb_schema_object->slock_bitmap.bitmap[0],
2174 ndb_schema_object->slock_bitmap.bitmap[1]);
2175 }
2176
2177 /*
2178 Wait for other mysqld's to acknowledge the table operation
2179 */
2180 if (ndb_error == 0 && !bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2181 {
2182 int max_timeout= DEFAULT_SYNC_TIMEOUT;
2183 pthread_mutex_lock(&ndb_schema_object->mutex);
2184 while (1)
2185 {
2186 struct timespec abstime;
2187 int i;
2188 int no_storage_nodes= g_ndb_cluster_connection->no_db_nodes();
2189 set_timespec(abstime, 1);
2190 int ret= pthread_cond_timedwait(&injector_cond,
2191 &ndb_schema_object->mutex,
2192 &abstime);
2193 if (thd->killed)
2194 break;
2195
2196 /* begin protect ndb_schema_share */
2197 pthread_mutex_lock(&ndb_schema_share_mutex);
2198 if (ndb_schema_share == 0)
2199 {
2200 pthread_mutex_unlock(&ndb_schema_share_mutex);
2201 break;
2202 }
2203 MY_BITMAP servers;
2204 bitmap_init(&servers, 0, 256, FALSE);
2205 bitmap_clear_all(&servers);
2206 bitmap_set_bit(&servers, node_id); // "we" are always alive
2207 pthread_mutex_lock(&ndb_schema_share->mutex);
2208 for (i= 0; i < no_storage_nodes; i++)
2209 {
2210 /* remove any unsubscribed from schema_subscribers */
2211 MY_BITMAP *tmp= &ndb_schema_share->subscriber_bitmap[i];
2212 bitmap_union(&servers, tmp);
2213 }
2214 pthread_mutex_unlock(&ndb_schema_share->mutex);
2215 pthread_mutex_unlock(&ndb_schema_share_mutex);
2216 /* end protect ndb_schema_share */
2217
2218 /* remove any unsubscribed from ndb_schema_object->slock */
2219 bitmap_intersect(&ndb_schema_object->slock_bitmap, &servers);
2220 bitmap_free(&servers);
2221
2222 if (bitmap_is_clear_all(&ndb_schema_object->slock_bitmap))
2223 break;
2224
2225 if (ret)
2226 {
2227 max_timeout--;
2228 if (max_timeout == 0)
2229 {
2230 sql_print_error("NDB %s: distributing %s timed out. Ignoring...",
2231 type_str, ndb_schema_object->key);
2232 DBUG_ASSERT(false);
2233 break;
2234 }
2235 if (opt_ndb_extra_logging)
2236 ndb_report_waiting(type_str, max_timeout,
2237 "distributing", ndb_schema_object->key,
2238 &ndb_schema_object->slock_bitmap);
2239 }
2240 }
2241 pthread_mutex_unlock(&ndb_schema_object->mutex);
2242 }
2243 else if (ndb_error)
2244 {
2245 sql_print_error("NDB %s: distributing %s err: %u",
2246 type_str, ndb_schema_object->key,
2247 ndb_error->code);
2248 }
2249 else if (opt_ndb_extra_logging > 19)
2250 {
2251 sql_print_information("NDB %s: not waiting for distributing %s",
2252 type_str, ndb_schema_object->key);
2253 }
2254
2255 if (ndb_schema_object)
2256 ndb_free_schema_object(&ndb_schema_object, FALSE);
2257
2258 if (opt_ndb_extra_logging > 19)
2259 {
2260 sql_print_information("NDB: distribution of %s.%s(%u/%u) type: %s(%u) query: \'%s\'"
2261 " - complete!",
2262 db,
2263 table_name,
2264 ndb_table_id,
2265 ndb_table_version,
2266 get_schema_type_name(log_type),
2267 log_type,
2268 query);
2269 }
2270
2271 if (thd->slave_thread)
2272 update_slave_api_stats(ndb);
2273
2274 DBUG_RETURN(0);
2275 }
2276
2277 /*
2278 Handle _non_ data events from the storage nodes
2279 */
2280
2281 int
ndb_handle_schema_change(THD * thd,Ndb * is_ndb,NdbEventOperation * pOp,Ndb_event_data * event_data)2282 ndb_handle_schema_change(THD *thd, Ndb *is_ndb, NdbEventOperation *pOp,
2283 Ndb_event_data *event_data)
2284 {
2285 DBUG_ENTER("ndb_handle_schema_change");
2286 NDB_SHARE *share= event_data->share;
2287 TABLE *shadow_table= event_data->shadow_table;
2288 const char *tabname= shadow_table->s->table_name.str;
2289 const char *dbname= shadow_table->s->db.str;
2290 bool do_close_cached_tables= FALSE;
2291 bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId());
2292
2293 if (pOp->getEventType() == NDBEVENT::TE_ALTER)
2294 {
2295 DBUG_RETURN(0);
2296 }
2297 DBUG_ASSERT(pOp->getEventType() == NDBEVENT::TE_DROP ||
2298 pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE);
2299 {
2300 Thd_ndb *thd_ndb= get_thd_ndb(thd);
2301 Ndb *ndb= thd_ndb->ndb;
2302 NDBDICT *dict= ndb->getDictionary();
2303 ndb->setDatabaseName(dbname);
2304 Ndb_table_guard ndbtab_g(dict, tabname);
2305 const NDBTAB *ev_tab= pOp->getTable();
2306 const NDBTAB *cache_tab= ndbtab_g.get_table();
2307 if (cache_tab &&
2308 cache_tab->getObjectId() == ev_tab->getObjectId() &&
2309 cache_tab->getObjectVersion() <= ev_tab->getObjectVersion())
2310 ndbtab_g.invalidate();
2311 }
2312
2313 pthread_mutex_lock(&share->mutex);
2314 DBUG_ASSERT(share->state == NSS_DROPPED ||
2315 share->op == pOp || share->new_op == pOp);
2316 if (share->new_op)
2317 {
2318 share->new_op= 0;
2319 }
2320 if (share->op)
2321 {
2322 share->op= 0;
2323 }
2324 // either just us or drop table handling as well
2325
2326 /* Signal ha_ndbcluster::delete/rename_table that drop is done */
2327 pthread_mutex_unlock(&share->mutex);
2328 (void) pthread_cond_signal(&injector_cond);
2329
2330 pthread_mutex_lock(&ndbcluster_mutex);
2331 /* ndb_share reference binlog free */
2332 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
2333 share->key, share->use_count));
2334 free_share(&share, TRUE);
2335 if (is_remote_change && share && share->state != NSS_DROPPED)
2336 {
2337 DBUG_PRINT("info", ("remote change"));
2338 share->state= NSS_DROPPED;
2339 if (share->use_count != 1)
2340 {
2341 /* open handler holding reference */
2342 /* wait with freeing create ndb_share to below */
2343 do_close_cached_tables= TRUE;
2344 }
2345 else
2346 {
2347 /* ndb_share reference create free */
2348 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
2349 share->key, share->use_count));
2350 free_share(&share, TRUE);
2351 share= 0;
2352 }
2353 }
2354 else
2355 share= 0;
2356 pthread_mutex_unlock(&ndbcluster_mutex);
2357
2358 if (event_data)
2359 {
2360 delete event_data;
2361 pOp->setCustomData(NULL);
2362 }
2363
2364 pthread_mutex_lock(&injector_mutex);
2365 is_ndb->dropEventOperation(pOp);
2366 pOp= 0;
2367 pthread_mutex_unlock(&injector_mutex);
2368
2369 if (do_close_cached_tables)
2370 {
2371 TABLE_LIST table_list;
2372 memset(&table_list, 0, sizeof(table_list));
2373 table_list.db= (char *)dbname;
2374 table_list.alias= table_list.table_name= (char *)tabname;
2375 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2376 /* ndb_share reference create free */
2377 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
2378 share->key, share->use_count));
2379 free_share(&share);
2380 }
2381 DBUG_RETURN(0);
2382 }
2383
ndb_binlog_query(THD * thd,Cluster_schema * schema)2384 static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
2385 {
2386 /* any_value == 0 means local cluster sourced change that
2387 * should be logged
2388 */
2389 if (ndbcluster_anyvalue_is_reserved(schema->any_value))
2390 {
2391 /* Originating SQL node did not want this query logged */
2392 if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
2393 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
2394 "query not logged",
2395 schema->any_value);
2396 return;
2397 }
2398
2399 Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
2400 /*
2401 Start with serverId as received AnyValue, in case it's a composite
2402 (server_id_bits < 31).
2403 This is for 'future', as currently schema ops do not have composite
2404 AnyValues.
2405 In future it may be useful to support *not* mapping composite
2406 AnyValues to/from Binlogged server-ids.
2407 */
2408 Uint32 loggedServerId = schema->any_value;
2409
2410 if (queryServerId)
2411 {
2412 /*
2413 AnyValue has non-zero serverId, must be a query applied by a slave
2414 mysqld.
2415 TODO : Assert that we are running in the Binlog injector thread?
2416 */
2417 if (! g_ndb_log_slave_updates)
2418 {
2419 /* This MySQLD does not log slave updates */
2420 return;
2421 }
2422 }
2423 else
2424 {
2425 /* No ServerId associated with this query, mark it as ours */
2426 ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
2427 }
2428
2429 uint32 thd_server_id_save= thd->server_id;
2430 DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
2431 char *thd_db_save= thd->db;
2432 thd->server_id = loggedServerId;
2433 thd->db= schema->db;
2434 int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
2435 thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
2436 schema->query_length, FALSE,
2437 #ifdef NDB_THD_BINLOG_QUERY_HAS_DIRECT
2438 TRUE,
2439 #endif
2440 schema->name[0] == 0 || thd->db[0] == 0,
2441 errcode);
2442 thd->server_id= thd_server_id_save;
2443 thd->db= thd_db_save;
2444 }
2445
2446 static int
ndb_binlog_thread_handle_schema_event(THD * thd,Ndb * s_ndb,NdbEventOperation * pOp,List<Cluster_schema> * post_epoch_log_list,List<Cluster_schema> * post_epoch_unlock_list,MEM_ROOT * mem_root)2447 ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *s_ndb,
2448 NdbEventOperation *pOp,
2449 List<Cluster_schema>
2450 *post_epoch_log_list,
2451 List<Cluster_schema>
2452 *post_epoch_unlock_list,
2453 MEM_ROOT *mem_root)
2454 {
2455 DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
2456 Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
2457 NDB_SHARE *tmp_share= event_data->share;
2458 if (tmp_share && ndb_schema_share == tmp_share)
2459 {
2460 NDBEVENT::TableEvent ev_type= pOp->getEventType();
2461 DBUG_PRINT("enter", ("%s.%s ev_type: %d",
2462 tmp_share->db, tmp_share->table_name, ev_type));
2463 if (ev_type == NDBEVENT::TE_UPDATE ||
2464 ev_type == NDBEVENT::TE_INSERT)
2465 {
2466 Thd_ndb *thd_ndb= get_thd_ndb(thd);
2467 Ndb *ndb= thd_ndb->ndb;
2468 NDBDICT *dict= ndb->getDictionary();
2469 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
2470 Cluster_schema *schema= (Cluster_schema *)
2471 sql_alloc(sizeof(Cluster_schema));
2472 MY_BITMAP slock;
2473 bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
2474 uint node_id= g_ndb_cluster_connection->node_id();
2475 {
2476 ndbcluster_get_schema(event_data, schema);
2477 schema->any_value= pOp->getAnyValue();
2478 }
2479 enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2480 DBUG_PRINT("info",
2481 ("%s.%s: log query_length: %d query: '%s' type: %d",
2482 schema->db, schema->name,
2483 schema->query_length, schema->query,
2484 schema_type));
2485
2486 if (opt_ndb_extra_logging > 19)
2487 {
2488 sql_print_information("NDB: got schema event on %s.%s(%u/%u) query: '%s' type: %s(%d) node: %u slock: %x%x",
2489 schema->db, schema->name,
2490 schema->id, schema->version,
2491 schema->query,
2492 get_schema_type_name(schema_type),
2493 schema_type,
2494 schema->node_id,
2495 slock.bitmap[0], slock.bitmap[1]);
2496 }
2497
2498 if ((schema->db[0] == 0) && (schema->name[0] == 0))
2499 DBUG_RETURN(0);
2500 switch (schema_type)
2501 {
2502 case SOT_CLEAR_SLOCK:
2503 /*
2504 handle slock after epoch is completed to ensure that
2505 schema events get inserted in the binlog after any data
2506 events
2507 */
2508 post_epoch_log_list->push_back(schema, mem_root);
2509 DBUG_RETURN(0);
2510 case SOT_ALTER_TABLE_COMMIT:
2511 // fall through
2512 case SOT_RENAME_TABLE_PREPARE:
2513 // fall through
2514 case SOT_ONLINE_ALTER_TABLE_PREPARE:
2515 // fall through
2516 case SOT_ONLINE_ALTER_TABLE_COMMIT:
2517 post_epoch_log_list->push_back(schema, mem_root);
2518 post_epoch_unlock_list->push_back(schema, mem_root);
2519 DBUG_RETURN(0);
2520 break;
2521 default:
2522 break;
2523 }
2524
2525 if (schema->node_id != node_id)
2526 {
2527 int log_query= 0, post_epoch_unlock= 0;
2528 char errmsg[MYSQL_ERRMSG_SIZE];
2529
2530 switch (schema_type)
2531 {
2532 case SOT_RENAME_TABLE:
2533 // fall through
2534 case SOT_RENAME_TABLE_NEW:
2535 {
2536 uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
2537 "NDB Binlog: Skipping renaming locally "
2538 "defined table '%s.%s' from binlog schema "
2539 "event '%s' from node %d. ",
2540 schema->db, schema->name, schema->query,
2541 schema->node_id);
2542 errmsg[end]= '\0';
2543 }
2544 // fall through
2545 case SOT_DROP_TABLE:
2546 if (schema_type == SOT_DROP_TABLE)
2547 {
2548 uint end= my_snprintf(&errmsg[0], MYSQL_ERRMSG_SIZE,
2549 "NDB Binlog: Skipping dropping locally "
2550 "defined table '%s.%s' from binlog schema "
2551 "event '%s' from node %d. ",
2552 schema->db, schema->name, schema->query,
2553 schema->node_id);
2554 errmsg[end]= '\0';
2555 }
2556 if (! ndbcluster_check_if_local_table(schema->db, schema->name))
2557 {
2558 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2559 const int no_print_error[2]=
2560 {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */
2561 run_query(thd, schema->query,
2562 schema->query + schema->query_length,
2563 no_print_error);
2564 /* binlog dropping table after any table operations */
2565 post_epoch_log_list->push_back(schema, mem_root);
2566 /* acknowledge this query _after_ epoch completion */
2567 post_epoch_unlock= 1;
2568 }
2569 else
2570 {
2571 /* Tables exists as a local table, leave it */
2572 DBUG_PRINT("info", ("%s", errmsg));
2573 sql_print_error("%s", errmsg);
2574 log_query= 1;
2575 }
2576 // Fall through
2577 case SOT_TRUNCATE_TABLE:
2578 {
2579 char key[FN_REFLEN + 1];
2580 build_table_filename(key, sizeof(key) - 1,
2581 schema->db, schema->name, "", 0);
2582 /* ndb_share reference temporary, free below */
2583 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2584 if (share)
2585 {
2586 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2587 share->key, share->use_count));
2588 }
2589 // invalidation already handled by binlog thread
2590 if (!share || !share->op)
2591 {
2592 {
2593 ndb->setDatabaseName(schema->db);
2594 Ndb_table_guard ndbtab_g(dict, schema->name);
2595 ndbtab_g.invalidate();
2596 }
2597 TABLE_LIST table_list;
2598 memset(&table_list, 0, sizeof(table_list));
2599 table_list.db= schema->db;
2600 table_list.alias= table_list.table_name= schema->name;
2601 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2602 }
2603 /* ndb_share reference temporary free */
2604 if (share)
2605 {
2606 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
2607 share->key, share->use_count));
2608 free_share(&share);
2609 }
2610 }
2611 if (schema_type != SOT_TRUNCATE_TABLE)
2612 break;
2613 // fall through
2614 case SOT_CREATE_TABLE:
2615 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2616 if (ndbcluster_check_if_local_table(schema->db, schema->name))
2617 {
2618 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
2619 schema->db, schema->name));
2620 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
2621 "binlog schema event '%s' from node %d. ",
2622 schema->db, schema->name, schema->query,
2623 schema->node_id);
2624 }
2625 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
2626 {
2627 print_could_not_discover_error(thd, schema);
2628 }
2629 log_query= 1;
2630 break;
2631 case SOT_DROP_DB:
2632 /* Drop the database locally if it only contains ndb tables */
2633 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2634 if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
2635 {
2636 const int no_print_error[1]= {0};
2637 run_query(thd, schema->query,
2638 schema->query + schema->query_length,
2639 no_print_error);
2640 /* binlog dropping database after any table operations */
2641 post_epoch_log_list->push_back(schema, mem_root);
2642 /* acknowledge this query _after_ epoch completion */
2643 post_epoch_unlock= 1;
2644 }
2645 else
2646 {
2647 /* Database contained local tables, leave it */
2648 sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables "
2649 "binlog schema event '%s' from node %d. ",
2650 schema->db, schema->query,
2651 schema->node_id);
2652 log_query= 1;
2653 }
2654 break;
2655 case SOT_CREATE_DB:
2656 if (opt_ndb_extra_logging > 9)
2657 sql_print_information("SOT_CREATE_DB %s", schema->db);
2658
2659 /* fall through */
2660 case SOT_ALTER_DB:
2661 {
2662 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2663 const int no_print_error[1]= {0};
2664 run_query(thd, schema->query,
2665 schema->query + schema->query_length,
2666 no_print_error);
2667 log_query= 1;
2668 break;
2669 }
2670 case SOT_CREATE_USER:
2671 case SOT_DROP_USER:
2672 case SOT_RENAME_USER:
2673 case SOT_GRANT:
2674 case SOT_REVOKE:
2675 {
2676 if (opt_ndb_extra_logging > 9)
2677 sql_print_information("Got dist_priv event: %s, "
2678 "flushing privileges",
2679 get_schema_type_name(schema_type));
2680
2681 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
2682 const int no_print_error[1]= {0};
2683 char *cmd= (char *) "flush privileges";
2684 run_query(thd, cmd,
2685 cmd + strlen(cmd),
2686 no_print_error);
2687 log_query= 1;
2688 break;
2689 }
2690 case SOT_TABLESPACE:
2691 case SOT_LOGFILE_GROUP:
2692 log_query= 1;
2693 break;
2694 case SOT_ALTER_TABLE_COMMIT:
2695 case SOT_RENAME_TABLE_PREPARE:
2696 case SOT_ONLINE_ALTER_TABLE_PREPARE:
2697 case SOT_ONLINE_ALTER_TABLE_COMMIT:
2698 case SOT_CLEAR_SLOCK:
2699 abort();
2700 }
2701 if (log_query && ndb_binlog_running)
2702 ndb_binlog_query(thd, schema);
2703 /* signal that schema operation has been handled */
2704 DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
2705 if (bitmap_is_set(&slock, node_id))
2706 {
2707 if (post_epoch_unlock)
2708 post_epoch_unlock_list->push_back(schema, mem_root);
2709 else
2710 ndbcluster_update_slock(thd, schema->db, schema->name,
2711 schema->id, schema->version);
2712 }
2713 }
2714 DBUG_RETURN(0);
2715 }
2716 /*
2717 the normal case of UPDATE/INSERT has already been handled
2718 */
2719 switch (ev_type)
2720 {
2721 case NDBEVENT::TE_DELETE:
2722 // skip
2723 break;
2724 case NDBEVENT::TE_CLUSTER_FAILURE:
2725 if (opt_ndb_extra_logging)
2726 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
2727 ndb_schema_share->key,
2728 (uint)(pOp->getGCI() >> 32),
2729 (uint)(pOp->getGCI()));
2730 // fall through
2731 case NDBEVENT::TE_DROP:
2732 if (opt_ndb_extra_logging &&
2733 ndb_binlog_tables_inited && ndb_binlog_running)
2734 sql_print_information("NDB Binlog: ndb tables initially "
2735 "read only on reconnect.");
2736
2737 /* begin protect ndb_schema_share */
2738 pthread_mutex_lock(&ndb_schema_share_mutex);
2739 /* ndb_share reference binlog extra free */
2740 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
2741 ndb_schema_share->key,
2742 ndb_schema_share->use_count));
2743 free_share(&ndb_schema_share);
2744 ndb_schema_share= 0;
2745 ndb_binlog_tables_inited= FALSE;
2746 ndb_binlog_is_ready= FALSE;
2747 pthread_mutex_unlock(&ndb_schema_share_mutex);
2748 /* end protect ndb_schema_share */
2749
2750 close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
2751 // fall through
2752 case NDBEVENT::TE_ALTER:
2753 ndb_handle_schema_change(thd, s_ndb, pOp, event_data);
2754 break;
2755 case NDBEVENT::TE_NODE_FAILURE:
2756 {
2757 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2758 DBUG_ASSERT(node_id != 0xFF);
2759 pthread_mutex_lock(&tmp_share->mutex);
2760 bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
2761 DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
2762 if (opt_ndb_extra_logging)
2763 {
2764 sql_print_information("NDB Binlog: Node: %d, down,"
2765 " Subscriber bitmask %x%x",
2766 pOp->getNdbdNodeId(),
2767 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2768 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2769 }
2770 pthread_mutex_unlock(&tmp_share->mutex);
2771 (void) pthread_cond_signal(&injector_cond);
2772 break;
2773 }
2774 case NDBEVENT::TE_SUBSCRIBE:
2775 {
2776 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2777 uint8 req_id= pOp->getReqNodeId();
2778 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2779 pthread_mutex_lock(&tmp_share->mutex);
2780 bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2781 DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
2782 if (opt_ndb_extra_logging)
2783 {
2784 sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
2785 " Subscriber bitmask %x%x",
2786 pOp->getNdbdNodeId(),
2787 req_id,
2788 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2789 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2790 }
2791 pthread_mutex_unlock(&tmp_share->mutex);
2792 (void) pthread_cond_signal(&injector_cond);
2793 break;
2794 }
2795 case NDBEVENT::TE_UNSUBSCRIBE:
2796 {
2797 uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
2798 uint8 req_id= pOp->getReqNodeId();
2799 DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
2800 pthread_mutex_lock(&tmp_share->mutex);
2801 bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
2802 DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
2803 if (opt_ndb_extra_logging)
2804 {
2805 sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
2806 " Subscriber bitmask %x%x",
2807 pOp->getNdbdNodeId(),
2808 req_id,
2809 tmp_share->subscriber_bitmap[node_id].bitmap[1],
2810 tmp_share->subscriber_bitmap[node_id].bitmap[0]);
2811 }
2812 pthread_mutex_unlock(&tmp_share->mutex);
2813 (void) pthread_cond_signal(&injector_cond);
2814 break;
2815 }
2816 default:
2817 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
2818 "Ignoring...", (unsigned) ev_type, tmp_share->key);
2819 }
2820 }
2821 DBUG_RETURN(0);
2822 }
2823
2824 /*
2825 process any operations that should be done after
2826 the epoch is complete
2827 */
2828 static void
ndb_binlog_thread_handle_schema_event_post_epoch(THD * thd,List<Cluster_schema> * post_epoch_log_list,List<Cluster_schema> * post_epoch_unlock_list)2829 ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
2830 List<Cluster_schema>
2831 *post_epoch_log_list,
2832 List<Cluster_schema>
2833 *post_epoch_unlock_list)
2834 {
2835 if (post_epoch_log_list->elements == 0)
2836 return;
2837 DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
2838 Cluster_schema *schema;
2839 Thd_ndb *thd_ndb= get_thd_ndb(thd);
2840 Ndb *ndb= thd_ndb->ndb;
2841 NDBDICT *dict= ndb->getDictionary();
2842 while ((schema= post_epoch_log_list->pop()))
2843 {
2844 Thd_ndb_options_guard thd_ndb_options(thd_ndb);
2845 DBUG_PRINT("info",
2846 ("%s.%s: log query_length: %d query: '%s' type: %d",
2847 schema->db, schema->name,
2848 schema->query_length, schema->query,
2849 schema->type));
2850 int log_query= 0;
2851 {
2852 enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
2853 char key[FN_REFLEN + 1];
2854 build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
2855 if (schema_type == SOT_CLEAR_SLOCK)
2856 {
2857 pthread_mutex_lock(&ndbcluster_mutex);
2858 NDB_SCHEMA_OBJECT *ndb_schema_object=
2859 (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
2860 (const uchar*) key, strlen(key));
2861 if (ndb_schema_object &&
2862 (ndb_schema_object->table_id == schema->id &&
2863 ndb_schema_object->table_version == schema->version))
2864 {
2865 pthread_mutex_lock(&ndb_schema_object->mutex);
2866 if (opt_ndb_extra_logging > 19)
2867 {
2868 sql_print_information("NDB: CLEAR_SLOCK key: %s(%u/%u) from"
2869 " %x%x to %x%x",
2870 key, schema->id, schema->version,
2871 ndb_schema_object->slock[0],
2872 ndb_schema_object->slock[1],
2873 schema->slock[0],
2874 schema->slock[1]);
2875 }
2876 memcpy(ndb_schema_object->slock, schema->slock,
2877 sizeof(ndb_schema_object->slock));
2878 DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
2879 (uchar*)ndb_schema_object->slock_bitmap.bitmap,
2880 no_bytes_in_map(&ndb_schema_object->slock_bitmap));
2881 pthread_mutex_unlock(&ndb_schema_object->mutex);
2882 pthread_cond_signal(&injector_cond);
2883 }
2884 else if (opt_ndb_extra_logging > 19)
2885 {
2886 if (ndb_schema_object == 0)
2887 {
2888 sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)",
2889 key, schema->id, schema->version);
2890 }
2891 else
2892 {
2893 sql_print_information("NDB: Discarding event...key: %s "
2894 "non matching id/version [%u/%u] != [%u/%u]",
2895 key,
2896 ndb_schema_object->table_id,
2897 ndb_schema_object->table_version,
2898 schema->id,
2899 schema->version);
2900 }
2901 }
2902 pthread_mutex_unlock(&ndbcluster_mutex);
2903 continue;
2904 }
2905 /* ndb_share reference temporary, free below */
2906 NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
2907 if (share)
2908 {
2909 DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u",
2910 share->key, share->use_count));
2911 }
2912 switch (schema_type)
2913 {
2914 case SOT_DROP_DB:
2915 log_query= 1;
2916 break;
2917 case SOT_DROP_TABLE:
2918 if (opt_ndb_extra_logging > 9)
2919 sql_print_information("SOT_DROP_TABLE %s.%s", schema->db, schema->name);
2920 log_query= 1;
2921 {
2922 ndb->setDatabaseName(schema->db);
2923 Ndb_table_guard ndbtab_g(dict, schema->name);
2924 ndbtab_g.invalidate();
2925 }
2926 {
2927 TABLE_LIST table_list;
2928 memset(&table_list, 0, sizeof(table_list));
2929 table_list.db= schema->db;
2930 table_list.alias= table_list.table_name= schema->name;
2931 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2932 }
2933 break;
2934 case SOT_RENAME_TABLE:
2935 if (opt_ndb_extra_logging > 9)
2936 sql_print_information("SOT_RENAME_TABLE %s.%s", schema->db, schema->name);
2937 log_query= 1;
2938 if (share)
2939 {
2940 ndbcluster_rename_share(thd, share);
2941 }
2942 break;
2943 case SOT_RENAME_TABLE_PREPARE:
2944 if (opt_ndb_extra_logging > 9)
2945 sql_print_information("SOT_RENAME_TABLE_PREPARE %s.%s -> %s",
2946 schema->db, schema->name, schema->query);
2947 if (share &&
2948 schema->node_id != g_ndb_cluster_connection->node_id())
2949 ndbcluster_prepare_rename_share(share, schema->query);
2950 break;
2951 case SOT_ALTER_TABLE_COMMIT:
2952 if (opt_ndb_extra_logging > 9)
2953 sql_print_information("SOT_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
2954 if (schema->node_id == g_ndb_cluster_connection->node_id())
2955 break;
2956 log_query= 1;
2957 {
2958 ndb->setDatabaseName(schema->db);
2959 Ndb_table_guard ndbtab_g(dict, schema->name);
2960 ndbtab_g.invalidate();
2961 }
2962 {
2963 TABLE_LIST table_list;
2964 memset(&table_list, 0, sizeof(table_list));
2965 table_list.db= schema->db;
2966 table_list.alias= table_list.table_name= schema->name;
2967 close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
2968 }
2969 if (share)
2970 {
2971 if (share->op)
2972 {
2973 Ndb_event_data *event_data= (Ndb_event_data *) share->op->getCustomData();
2974 if (event_data)
2975 delete event_data;
2976 share->op->setCustomData(NULL);
2977 {
2978 Mutex_guard injector_mutex_g(injector_mutex);
2979 injector_ndb->dropEventOperation(share->op);
2980 }
2981 share->op= 0;
2982 free_share(&share);
2983 }
2984 free_share(&share);
2985 }
2986
2987 if (share)
2988 {
2989 /*
2990 Free the share pointer early, ndb_create_table_from_engine()
2991 may delete what share is pointing to as a sideeffect
2992 */
2993 DBUG_PRINT("NDB_SHARE", ("%s early free, use_count: %u",
2994 share->key, share->use_count));
2995 free_share(&share);
2996 share= 0;
2997 }
2998
2999 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3000 if (ndbcluster_check_if_local_table(schema->db, schema->name) &&
3001 !Ndb_dist_priv_util::is_distributed_priv_table(schema->db,
3002 schema->name))
3003 {
3004 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' "
3005 "from binlog schema event '%s' from node %d.",
3006 schema->db, schema->name, schema->query,
3007 schema->node_id);
3008 }
3009 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
3010 {
3011 print_could_not_discover_error(thd, schema);
3012 }
3013 break;
3014
3015 case SOT_ONLINE_ALTER_TABLE_PREPARE:
3016 {
3017 if (opt_ndb_extra_logging > 9)
3018 sql_print_information("SOT_ONLINE_ALTER_TABLE_PREPARE %s.%s", schema->db, schema->name);
3019 int error= 0;
3020 ndb->setDatabaseName(schema->db);
3021 {
3022 Ndb_table_guard ndbtab_g(dict, schema->name);
3023 ndbtab_g.get_table();
3024 ndbtab_g.invalidate();
3025 }
3026 Ndb_table_guard ndbtab_g(dict, schema->name);
3027 const NDBTAB *ndbtab= ndbtab_g.get_table();
3028 /*
3029 Refresh local frm file and dictionary cache if
3030 remote on-line alter table
3031 */
3032 TABLE_LIST table_list;
3033 memset(&table_list, 0, sizeof(table_list));
3034 table_list.db= (char *)schema->db;
3035 table_list.alias= table_list.table_name= (char *)schema->name;
3036 close_cached_tables(thd, &table_list, TRUE, FALSE, FALSE);
3037
3038 if (schema->node_id != g_ndb_cluster_connection->node_id())
3039 {
3040 char key[FN_REFLEN];
3041 uchar *data= 0, *pack_data= 0;
3042 size_t length, pack_length;
3043
3044 DBUG_PRINT("info", ("Detected frm change of table %s.%s",
3045 schema->db, schema->name));
3046 log_query= 1;
3047 build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
3048 /*
3049 If the there is no local table shadowing the altered table and
3050 it has an frm that is different than the one on disk then
3051 overwrite it with the new table definition
3052 */
3053 if (!ndbcluster_check_if_local_table(schema->db, schema->name) &&
3054 readfrm(key, &data, &length) == 0 &&
3055 packfrm(data, length, &pack_data, &pack_length) == 0 &&
3056 cmp_frm(ndbtab, pack_data, pack_length))
3057 {
3058 DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(),
3059 ndbtab->getFrmLength());
3060 my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
3061 data= NULL;
3062 if ((error= unpackfrm(&data, &length,
3063 (const uchar*) ndbtab->getFrmData())) ||
3064 (error= writefrm(key, data, length)))
3065 {
3066 sql_print_error("NDB: Failed write frm for %s.%s, error %d",
3067 schema->db, schema->name, error);
3068 }
3069 }
3070 my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
3071 my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
3072 }
3073 if (share)
3074 {
3075 if (opt_ndb_extra_logging > 9)
3076 sql_print_information("NDB Binlog: handeling online alter/rename");
3077
3078 pthread_mutex_lock(&share->mutex);
3079 ndbcluster_binlog_close_table(thd, share);
3080
3081 if ((error= ndb_binlog_open_shadow_table(thd, share)))
3082 sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s",
3083 schema->db, schema->name);
3084 if (error)
3085 pthread_mutex_unlock(&share->mutex);
3086 }
3087 if (!error && share)
3088 {
3089 if (share->event_data->shadow_table->s->primary_key == MAX_KEY)
3090 share->flags|= NSF_HIDDEN_PK;
3091 /*
3092 Refresh share->flags to handle added BLOB columns
3093 */
3094 if (share->event_data->shadow_table->s->blob_fields != 0)
3095 share->flags|= NSF_BLOB_FLAG;
3096
3097 /*
3098 Start subscribing to data changes to the new table definition
3099 */
3100 String event_name(INJECTOR_EVENT_LEN);
3101 ndb_rep_event_name(&event_name, schema->db, schema->name,
3102 get_binlog_full(share));
3103 NdbEventOperation *tmp_op= share->op;
3104 share->new_op= 0;
3105 share->op= 0;
3106
3107 if (ndbcluster_create_event_ops(thd, share, ndbtab, event_name.c_ptr()))
3108 {
3109 sql_print_error("NDB Binlog:"
3110 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
3111 event_name.c_ptr());
3112 }
3113 else
3114 {
3115 share->new_op= share->op;
3116 }
3117 share->op= tmp_op;
3118 pthread_mutex_unlock(&share->mutex);
3119
3120 if (opt_ndb_extra_logging > 9)
3121 sql_print_information("NDB Binlog: handeling online alter/rename done");
3122 }
3123 break;
3124 }
3125 case SOT_ONLINE_ALTER_TABLE_COMMIT:
3126 {
3127 if (opt_ndb_extra_logging > 9)
3128 sql_print_information("SOT_ONLINE_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
3129 if (share)
3130 {
3131 pthread_mutex_lock(&share->mutex);
3132 if (share->op && share->new_op)
3133 {
3134 Ndb_event_data *event_data= (Ndb_event_data *) share->op->getCustomData();
3135 if (event_data)
3136 delete event_data;
3137 share->op->setCustomData(NULL);
3138 {
3139 Mutex_guard injector_mutex_g(injector_mutex);
3140 injector_ndb->dropEventOperation(share->op);
3141 }
3142 share->op= share->new_op;
3143 share->new_op= 0;
3144 free_share(&share);
3145 }
3146 pthread_mutex_unlock(&share->mutex);
3147 }
3148 break;
3149 }
3150 case SOT_RENAME_TABLE_NEW:
3151 if (opt_ndb_extra_logging > 9)
3152 sql_print_information("SOT_RENAME_TABLE_NEW %s.%s", schema->db, schema->name);
3153 log_query= 1;
3154 if (ndb_binlog_running && (!share || !share->op))
3155 {
3156 /*
3157 we need to free any share here as command below
3158 may need to call handle_trailing_share
3159 */
3160 if (share)
3161 {
3162 /* ndb_share reference temporary free */
3163 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
3164 share->key, share->use_count));
3165 free_share(&share);
3166 share= 0;
3167 }
3168 thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
3169 if (ndbcluster_check_if_local_table(schema->db, schema->name))
3170 {
3171 DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'",
3172 schema->db, schema->name));
3173 sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from "
3174 "binlog schema event '%s' from node %d. ",
3175 schema->db, schema->name, schema->query,
3176 schema->node_id);
3177 }
3178 else if (ndb_create_table_from_engine(thd, schema->db, schema->name))
3179 {
3180 print_could_not_discover_error(thd, schema);
3181 }
3182 }
3183 break;
3184 default:
3185 DBUG_ASSERT(FALSE);
3186 }
3187 if (share)
3188 {
3189 /* ndb_share reference temporary free */
3190 DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u",
3191 share->key, share->use_count));
3192 free_share(&share);
3193 share= 0;
3194 }
3195 }
3196 if (ndb_binlog_running && log_query)
3197 ndb_binlog_query(thd, schema);
3198 }
3199 while ((schema= post_epoch_unlock_list->pop()))
3200 {
3201 ndbcluster_update_slock(thd, schema->db, schema->name,
3202 schema->id, schema->version);
3203 }
3204 DBUG_VOID_RETURN;
3205 }
3206
3207 /*
3208 Timer class for doing performance measurements
3209 */
3210
3211 /*********************************************************************
3212 Internal helper functions for handeling of the cluster replication tables
3213 - ndb_binlog_index
3214 - ndb_apply_status
3215 *********************************************************************/
3216
3217 /*
3218 struct to hold the data to be inserted into the
3219 ndb_binlog_index table
3220 */
3221 struct ndb_binlog_index_row {
3222 ulonglong epoch;
3223 const char *master_log_file;
3224 ulonglong master_log_pos;
3225 ulong n_inserts;
3226 ulong n_updates;
3227 ulong n_deletes;
3228 ulong n_schemaops;
3229
3230 ulong orig_server_id;
3231 ulonglong orig_epoch;
3232
3233 ulong gci;
3234
3235 struct ndb_binlog_index_row *next;
3236 };
3237
3238
3239 /*
3240 Open the ndb_binlog_index table for writing
3241 */
3242 static int
ndb_binlog_index_table__open(THD * thd,TABLE ** ndb_binlog_index)3243 ndb_binlog_index_table__open(THD *thd,
3244 TABLE **ndb_binlog_index)
3245 {
3246 const char *save_proc_info=
3247 thd_proc_info(thd, "Opening " NDB_REP_DB "." NDB_REP_TABLE);
3248
3249 TABLE_LIST tables;
3250 tables.init_one_table(STRING_WITH_LEN(NDB_REP_DB), // db
3251 STRING_WITH_LEN(NDB_REP_TABLE), // name
3252 NDB_REP_TABLE, // alias
3253 TL_WRITE); // for write
3254
3255 /* Only allow real table to be opened */
3256 tables.required_type= FRMTYPE_TABLE;
3257
3258 const bool derived = false;
3259 const uint flags =
3260 MYSQL_LOCK_IGNORE_TIMEOUT; /* Wait for lock "infinitely" */
3261 if (open_and_lock_tables(thd, &tables, derived, flags))
3262 {
3263 if (thd->killed)
3264 sql_print_error("NDB Binlog: Opening ndb_binlog_index: killed");
3265 else
3266 sql_print_error("NDB Binlog: Opening ndb_binlog_index: %d, '%s'",
3267 thd->get_stmt_da()->sql_errno(),
3268 thd->get_stmt_da()->message());
3269 thd_proc_info(thd, save_proc_info);
3270 return -1;
3271 }
3272 *ndb_binlog_index= tables.table;
3273 thd_proc_info(thd, save_proc_info);
3274 return 0;
3275 }
3276
3277
3278 /*
3279 Write rows to the ndb_binlog_index table
3280 */
3281 static int
ndb_binlog_index_table__write_rows(THD * thd,ndb_binlog_index_row * row)3282 ndb_binlog_index_table__write_rows(THD *thd,
3283 ndb_binlog_index_row *row)
3284 {
3285 int error= 0;
3286 ndb_binlog_index_row *first= row;
3287 TABLE *ndb_binlog_index= 0;
3288
3289 /*
3290 Assume this function is not called with an error set in thd
3291 (but clear for safety in release version)
3292 */
3293 assert(!thd->is_error());
3294 thd->clear_error();
3295
3296 /*
3297 Turn of binlogging to prevent the table changes to be written to
3298 the binary log.
3299 */
3300 tmp_disable_binlog(thd);
3301
3302 if (ndb_binlog_index_table__open(thd, &ndb_binlog_index))
3303 {
3304 sql_print_error("NDB Binlog: Unable to open ndb_binlog_index table");
3305 error= -1;
3306 goto add_ndb_binlog_index_err;
3307 }
3308
3309 // Set all columns to be written
3310 ndb_binlog_index->use_all_columns();
3311
3312 do
3313 {
3314 ulonglong epoch= 0, orig_epoch= 0;
3315 uint orig_server_id= 0;
3316
3317 // Intialize ndb_binlog_index->record[0]
3318 empty_record(ndb_binlog_index);
3319
3320 ndb_binlog_index->field[0]->store(first->master_log_pos, true);
3321 ndb_binlog_index->field[1]->store(first->master_log_file,
3322 strlen(first->master_log_file),
3323 &my_charset_bin);
3324 ndb_binlog_index->field[2]->store(epoch= first->epoch, true);
3325 if (ndb_binlog_index->s->fields > 7)
3326 {
3327 ndb_binlog_index->field[3]->store(row->n_inserts, true);
3328 ndb_binlog_index->field[4]->store(row->n_updates, true);
3329 ndb_binlog_index->field[5]->store(row->n_deletes, true);
3330 ndb_binlog_index->field[6]->store(row->n_schemaops, true);
3331 ndb_binlog_index->field[7]->store(orig_server_id= row->orig_server_id, true);
3332 ndb_binlog_index->field[8]->store(orig_epoch= row->orig_epoch, true);
3333 ndb_binlog_index->field[9]->store(first->gci, true);
3334 row= row->next;
3335 }
3336 else
3337 {
3338 while ((row= row->next))
3339 {
3340 first->n_inserts+= row->n_inserts;
3341 first->n_updates+= row->n_updates;
3342 first->n_deletes+= row->n_deletes;
3343 first->n_schemaops+= row->n_schemaops;
3344 }
3345 ndb_binlog_index->field[3]->store((ulonglong)first->n_inserts, true);
3346 ndb_binlog_index->field[4]->store((ulonglong)first->n_updates, true);
3347 ndb_binlog_index->field[5]->store((ulonglong)first->n_deletes, true);
3348 ndb_binlog_index->field[6]->store((ulonglong)first->n_schemaops, true);
3349 }
3350
3351 if ((error= ndb_binlog_index->file->ha_write_row(ndb_binlog_index->record[0])))
3352 {
3353 char tmp[128];
3354 if (ndb_binlog_index->s->fields > 7)
3355 my_snprintf(tmp, sizeof(tmp), "%u/%u,%u,%u/%u",
3356 uint(epoch >> 32), uint(epoch),
3357 orig_server_id,
3358 uint(orig_epoch >> 32), uint(orig_epoch));
3359
3360 else
3361 my_snprintf(tmp, sizeof(tmp), "%u/%u", uint(epoch >> 32), uint(epoch));
3362 sql_print_error("NDB Binlog: Writing row (%s) to ndb_binlog_index: %d",
3363 tmp, error);
3364 error= -1;
3365 goto add_ndb_binlog_index_err;
3366 }
3367 } while (row);
3368
3369 add_ndb_binlog_index_err:
3370 /*
3371 Explicitly commit or rollback the writes(although we normally
3372 use a non transactional engine for the ndb_binlog_index table)
3373 */
3374 thd->get_stmt_da()->set_overwrite_status(true);
3375 thd->is_error() ? trans_rollback_stmt(thd) : trans_commit_stmt(thd);
3376 thd->get_stmt_da()->set_overwrite_status(false);
3377
3378 // Close the tables this thread has opened
3379 close_thread_tables(thd);
3380
3381 /*
3382 There should be no need for rolling back transaction due to deadlock
3383 (since ndb_binlog_index is non transactional).
3384 */
3385 DBUG_ASSERT(! thd->transaction_rollback_request);
3386
3387 // Release MDL locks on the opened table
3388 thd->mdl_context.release_transactional_locks();
3389
3390 reenable_binlog(thd);
3391 return error;
3392 }
3393
3394 /*********************************************************************
3395 Functions for start, stop, wait for ndbcluster binlog thread
3396 *********************************************************************/
3397
3398 pthread_handler_t ndb_binlog_thread_func(void *arg);
3399
ndbcluster_binlog_start()3400 int ndbcluster_binlog_start()
3401 {
3402 DBUG_ENTER("ndbcluster_binlog_start");
3403
3404 if (::server_id == 0)
3405 {
3406 sql_print_warning("NDB: server id set to zero - changes logged to "
3407 "bin log with server id zero will be logged with "
3408 "another server id by slave mysqlds");
3409 }
3410
3411 /*
3412 Check that ServerId is not using the reserved bit or bits reserved
3413 for application use
3414 */
3415 if ((::server_id & 0x1 << 31) || // Reserved bit
3416 !ndbcluster_anyvalue_is_serverid_in_range(::server_id)) // server_id_bits
3417 {
3418 sql_print_error("NDB: server id provided is too large to be represented in "
3419 "opt_server_id_bits or is reserved");
3420 DBUG_RETURN(-1);
3421 }
3422
3423 pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
3424 pthread_cond_init(&injector_cond, NULL);
3425 pthread_mutex_init(&ndb_schema_share_mutex, MY_MUTEX_INIT_FAST);
3426
3427 /* Create injector thread */
3428 if (pthread_create(&ndb_binlog_thread, &connection_attrib,
3429 ndb_binlog_thread_func, 0))
3430 {
3431 DBUG_PRINT("error", ("Could not create ndb injector thread"));
3432 pthread_cond_destroy(&injector_cond);
3433 pthread_mutex_destroy(&injector_mutex);
3434 DBUG_RETURN(-1);
3435 }
3436
3437 ndbcluster_binlog_inited= 1;
3438
3439 /* Wait for the injector thread to start */
3440 pthread_mutex_lock(&injector_mutex);
3441 while (!ndb_binlog_thread_running)
3442 pthread_cond_wait(&injector_cond, &injector_mutex);
3443 pthread_mutex_unlock(&injector_mutex);
3444
3445 if (ndb_binlog_thread_running < 0)
3446 DBUG_RETURN(-1);
3447
3448 DBUG_RETURN(0);
3449 }
3450
3451
3452 /**************************************************************
3453 Internal helper functions for creating/dropping ndb events
3454 used by the client sql threads
3455 **************************************************************/
3456 void
ndb_rep_event_name(String * event_name,const char * db,const char * tbl,my_bool full)3457 ndb_rep_event_name(String *event_name,const char *db, const char *tbl,
3458 my_bool full)
3459 {
3460 if (full)
3461 event_name->set_ascii("REPLF$", 6);
3462 else
3463 event_name->set_ascii("REPL$", 5);
3464 event_name->append(db);
3465 #ifdef NDB_WIN32
3466 /*
3467 * Some bright spark decided that we should sometimes have backslashes.
3468 * This causes us pain as the event is db/table and not db\table so trying
3469 * to drop db\table when we meant db/table ends in the event lying around
3470 * after drop table, leading to all sorts of pain.
3471 */
3472 String backslash_sep(1);
3473 backslash_sep.set_ascii("\\",1);
3474
3475 int bsloc;
3476 if((bsloc= event_name->strstr(backslash_sep,0))!=-1)
3477 event_name->replace(bsloc, 1, "/", 1);
3478 #endif
3479 if (tbl)
3480 {
3481 event_name->append('/');
3482 event_name->append(tbl);
3483 }
3484 DBUG_PRINT("info", ("ndb_rep_event_name: %s", event_name->c_ptr()));
3485 }
3486
3487 #ifdef HAVE_NDB_BINLOG
3488 static void
set_binlog_flags(NDB_SHARE * share,Ndb_binlog_type ndb_binlog_type)3489 set_binlog_flags(NDB_SHARE *share,
3490 Ndb_binlog_type ndb_binlog_type)
3491 {
3492 DBUG_ENTER("set_binlog_flags");
3493 switch (ndb_binlog_type)
3494 {
3495 case NBT_NO_LOGGING:
3496 DBUG_PRINT("info", ("NBT_NO_LOGGING"));
3497 set_binlog_nologging(share);
3498 DBUG_VOID_RETURN;
3499 case NBT_DEFAULT:
3500 DBUG_PRINT("info", ("NBT_DEFAULT"));
3501 if (opt_ndb_log_updated_only)
3502 {
3503 set_binlog_updated_only(share);
3504 }
3505 else
3506 {
3507 set_binlog_full(share);
3508 }
3509 if (opt_ndb_log_update_as_write)
3510 {
3511 set_binlog_use_write(share);
3512 }
3513 else
3514 {
3515 set_binlog_use_update(share);
3516 }
3517 break;
3518 case NBT_UPDATED_ONLY:
3519 DBUG_PRINT("info", ("NBT_UPDATED_ONLY"));
3520 set_binlog_updated_only(share);
3521 set_binlog_use_write(share);
3522 break;
3523 case NBT_USE_UPDATE:
3524 DBUG_PRINT("info", ("NBT_USE_UPDATE"));
3525 case NBT_UPDATED_ONLY_USE_UPDATE:
3526 DBUG_PRINT("info", ("NBT_UPDATED_ONLY_USE_UPDATE"));
3527 set_binlog_updated_only(share);
3528 set_binlog_use_update(share);
3529 break;
3530 case NBT_FULL:
3531 DBUG_PRINT("info", ("NBT_FULL"));
3532 set_binlog_full(share);
3533 set_binlog_use_write(share);
3534 break;
3535 case NBT_FULL_USE_UPDATE:
3536 DBUG_PRINT("info", ("NBT_FULL_USE_UPDATE"));
3537 set_binlog_full(share);
3538 set_binlog_use_update(share);
3539 break;
3540 }
3541 set_binlog_logging(share);
3542 DBUG_VOID_RETURN;
3543 }
3544
3545
slave_reset_conflict_fn(NDB_SHARE * share)3546 inline void slave_reset_conflict_fn(NDB_SHARE *share)
3547 {
3548 NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
3549 if (cfn_share)
3550 {
3551 memset(cfn_share, 0, sizeof(*cfn_share));
3552 }
3553 }
3554
3555 static uint
slave_check_resolve_col_type(const NDBTAB * ndbtab,uint field_index)3556 slave_check_resolve_col_type(const NDBTAB *ndbtab,
3557 uint field_index)
3558 {
3559 DBUG_ENTER("slave_check_resolve_col_type");
3560 const NDBCOL *c= ndbtab->getColumn(field_index);
3561 uint sz= 0;
3562 switch (c->getType())
3563 {
3564 case NDBCOL::Unsigned:
3565 sz= sizeof(Uint32);
3566 DBUG_PRINT("info", ("resolve column Uint32 %u",
3567 field_index));
3568 break;
3569 case NDBCOL::Bigunsigned:
3570 sz= sizeof(Uint64);
3571 DBUG_PRINT("info", ("resolve column Uint64 %u",
3572 field_index));
3573 break;
3574 default:
3575 DBUG_PRINT("info", ("resolve column %u has wrong type",
3576 field_index));
3577 break;
3578 }
3579 DBUG_RETURN(sz);
3580 }
3581
3582 static int
slave_set_resolve_fn(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,uint field_index,uint resolve_col_sz,const st_conflict_fn_def * conflict_fn,TABLE * table,uint8 flags)3583 slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
3584 const NDBTAB *ndbtab, uint field_index,
3585 uint resolve_col_sz,
3586 const st_conflict_fn_def* conflict_fn,
3587 TABLE *table,
3588 uint8 flags)
3589 {
3590 DBUG_ENTER("slave_set_resolve_fn");
3591
3592 Thd_ndb *thd_ndb= get_thd_ndb(thd);
3593 Ndb *ndb= thd_ndb->ndb;
3594 NDBDICT *dict= ndb->getDictionary();
3595 NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
3596 if (cfn_share == NULL)
3597 {
3598 share->m_cfn_share= cfn_share= (NDB_CONFLICT_FN_SHARE*)
3599 alloc_root(&share->mem_root, sizeof(NDB_CONFLICT_FN_SHARE));
3600 slave_reset_conflict_fn(share);
3601 }
3602 cfn_share->m_conflict_fn= conflict_fn;
3603
3604 /* Calculate resolve col stuff (if relevant) */
3605 cfn_share->m_resolve_size= resolve_col_sz;
3606 cfn_share->m_resolve_column= field_index;
3607 cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
3608 table->record[0]);
3609 cfn_share->m_flags = flags;
3610
3611 {
3612 /* get exceptions table */
3613 char ex_tab_name[FN_REFLEN];
3614 strxnmov(ex_tab_name, sizeof(ex_tab_name), share->table_name,
3615 lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
3616 NDB_EXCEPTIONS_TABLE_SUFFIX, NullS);
3617 ndb->setDatabaseName(share->db);
3618 Ndb_table_guard ndbtab_g(dict, ex_tab_name);
3619 const NDBTAB *ex_tab= ndbtab_g.get_table();
3620 if (ex_tab)
3621 {
3622 const int fixed_cols= 4;
3623 bool ok=
3624 ex_tab->getNoOfColumns() >= fixed_cols &&
3625 ex_tab->getNoOfPrimaryKeys() == 4 &&
3626 /* server id */
3627 ex_tab->getColumn(0)->getType() == NDBCOL::Unsigned &&
3628 ex_tab->getColumn(0)->getPrimaryKey() &&
3629 /* master_server_id */
3630 ex_tab->getColumn(1)->getType() == NDBCOL::Unsigned &&
3631 ex_tab->getColumn(1)->getPrimaryKey() &&
3632 /* master_epoch */
3633 ex_tab->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
3634 ex_tab->getColumn(2)->getPrimaryKey() &&
3635 /* count */
3636 ex_tab->getColumn(3)->getType() == NDBCOL::Unsigned &&
3637 ex_tab->getColumn(3)->getPrimaryKey();
3638 if (ok)
3639 {
3640 int ncol= ndbtab->getNoOfColumns();
3641 int nkey= ndbtab->getNoOfPrimaryKeys();
3642 int i, k;
3643 for (i= k= 0; i < ncol && k < nkey; i++)
3644 {
3645 const NdbDictionary::Column* col= ndbtab->getColumn(i);
3646 if (col->getPrimaryKey())
3647 {
3648 const NdbDictionary::Column* ex_col=
3649 ex_tab->getColumn(fixed_cols + k);
3650 ok=
3651 ex_col != NULL &&
3652 col->getType() == ex_col->getType() &&
3653 col->getLength() == ex_col->getLength() &&
3654 col->getNullable() == ex_col->getNullable();
3655 if (!ok)
3656 break;
3657 cfn_share->m_offset[k]=
3658 (uint16)(table->field[i]->ptr - table->record[0]);
3659 k++;
3660 }
3661 }
3662 if (ok)
3663 {
3664 cfn_share->m_ex_tab= ex_tab;
3665 cfn_share->m_pk_cols= nkey;
3666 ndbtab_g.release();
3667 if (opt_ndb_extra_logging)
3668 sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
3669 table->s->db.str,
3670 table->s->table_name.str,
3671 table->s->db.str,
3672 ex_tab_name);
3673 }
3674 else
3675 sql_print_warning("NDB Slave: exceptions table %s has wrong "
3676 "definition (column %d)",
3677 ex_tab_name, fixed_cols + k);
3678 }
3679 else
3680 sql_print_warning("NDB Slave: exceptions table %s has wrong "
3681 "definition (initial %d columns)",
3682 ex_tab_name, fixed_cols);
3683 }
3684 }
3685 DBUG_RETURN(0);
3686 }
3687
3688 /**
3689 CFT_NDB_OLD
3690
3691 To perform conflict detection, an interpreted program is used to read
3692 the timestamp stored locally and compare to what was on the master.
3693 If timestamp is not equal, an error for this operation (9998) will be raised,
3694 and new row will not be applied. The error codes for the operations will
3695 be checked on return. For this to work is is vital that the operation
3696 is run with ignore error option.
3697
3698 As an independent feature, phase 2 also saves the
3699 conflicts into the table's exceptions table.
3700 */
3701 int
row_conflict_fn_old(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const uchar * old_data,const uchar * new_data,const MY_BITMAP * write_set,NdbInterpretedCode * code)3702 row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
3703 enum_conflicting_op_type op_type,
3704 const uchar* old_data,
3705 const uchar* new_data,
3706 const MY_BITMAP* write_set,
3707 NdbInterpretedCode* code)
3708 {
3709 DBUG_ENTER("row_conflict_fn_old");
3710 uint32 resolve_column= cfn_share->m_resolve_column;
3711 uint32 resolve_size= cfn_share->m_resolve_size;
3712 const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
3713
3714 assert((resolve_size == 4) || (resolve_size == 8));
3715
3716 if (unlikely(!bitmap_is_set(write_set, resolve_column)))
3717 {
3718 sql_print_information("NDB Slave: missing data for %s",
3719 cfn_share->m_conflict_fn->name);
3720 DBUG_RETURN(1);
3721 }
3722
3723 const uint label_0= 0;
3724 const Uint32 RegOldValue= 1, RegCurrentValue= 2;
3725 int r;
3726
3727 DBUG_PRINT("info",
3728 ("Adding interpreted filter, existing value must eq event old value"));
3729 /*
3730 * read old value from record
3731 */
3732 union {
3733 uint32 old_value_32;
3734 uint64 old_value_64;
3735 };
3736 {
3737 if (resolve_size == 4)
3738 {
3739 memcpy(&old_value_32, field_ptr, resolve_size);
3740 DBUG_PRINT("info", (" old_value_32: %u", old_value_32));
3741 }
3742 else
3743 {
3744 memcpy(&old_value_64, field_ptr, resolve_size);
3745 DBUG_PRINT("info", (" old_value_64: %llu",
3746 (unsigned long long) old_value_64));
3747 }
3748 }
3749
3750 /*
3751 * Load registers RegOldValue and RegCurrentValue
3752 */
3753 if (resolve_size == 4)
3754 r= code->load_const_u32(RegOldValue, old_value_32);
3755 else
3756 r= code->load_const_u64(RegOldValue, old_value_64);
3757 DBUG_ASSERT(r == 0);
3758 r= code->read_attr(RegCurrentValue, resolve_column);
3759 DBUG_ASSERT(r == 0);
3760 /*
3761 * if RegOldValue == RegCurrentValue goto label_0
3762 * else raise error for this row
3763 */
3764 r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
3765 DBUG_ASSERT(r == 0);
3766 r= code->interpret_exit_nok(error_conflict_fn_violation);
3767 DBUG_ASSERT(r == 0);
3768 r= code->def_label(label_0);
3769 DBUG_ASSERT(r == 0);
3770 r= code->interpret_exit_ok();
3771 DBUG_ASSERT(r == 0);
3772 r= code->finalise();
3773 DBUG_ASSERT(r == 0);
3774 DBUG_RETURN(r);
3775 }
3776
3777 int
row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const uchar * old_data,const uchar * new_data,const MY_BITMAP * write_set,NdbInterpretedCode * code)3778 row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
3779 enum_conflicting_op_type op_type,
3780 const uchar* old_data,
3781 const uchar* new_data,
3782 const MY_BITMAP* write_set,
3783 NdbInterpretedCode* code)
3784 {
3785 DBUG_ENTER("row_conflict_fn_max_update_only");
3786 uint32 resolve_column= cfn_share->m_resolve_column;
3787 uint32 resolve_size= cfn_share->m_resolve_size;
3788 const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
3789
3790 assert((resolve_size == 4) || (resolve_size == 8));
3791
3792 if (unlikely(!bitmap_is_set(write_set, resolve_column)))
3793 {
3794 sql_print_information("NDB Slave: missing data for %s",
3795 cfn_share->m_conflict_fn->name);
3796 DBUG_RETURN(1);
3797 }
3798
3799 const uint label_0= 0;
3800 const Uint32 RegNewValue= 1, RegCurrentValue= 2;
3801 int r;
3802
3803 DBUG_PRINT("info",
3804 ("Adding interpreted filter, existing value must be lt event new"));
3805 /*
3806 * read new value from record
3807 */
3808 union {
3809 uint32 new_value_32;
3810 uint64 new_value_64;
3811 };
3812 {
3813 if (resolve_size == 4)
3814 {
3815 memcpy(&new_value_32, field_ptr, resolve_size);
3816 DBUG_PRINT("info", (" new_value_32: %u", new_value_32));
3817 }
3818 else
3819 {
3820 memcpy(&new_value_64, field_ptr, resolve_size);
3821 DBUG_PRINT("info", (" new_value_64: %llu",
3822 (unsigned long long) new_value_64));
3823 }
3824 }
3825 /*
3826 * Load registers RegNewValue and RegCurrentValue
3827 */
3828 if (resolve_size == 4)
3829 r= code->load_const_u32(RegNewValue, new_value_32);
3830 else
3831 r= code->load_const_u64(RegNewValue, new_value_64);
3832 DBUG_ASSERT(r == 0);
3833 r= code->read_attr(RegCurrentValue, resolve_column);
3834 DBUG_ASSERT(r == 0);
3835 /*
3836 * if RegNewValue > RegCurrentValue goto label_0
3837 * else raise error for this row
3838 */
3839 r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
3840 DBUG_ASSERT(r == 0);
3841 r= code->interpret_exit_nok(error_conflict_fn_violation);
3842 DBUG_ASSERT(r == 0);
3843 r= code->def_label(label_0);
3844 DBUG_ASSERT(r == 0);
3845 r= code->interpret_exit_ok();
3846 DBUG_ASSERT(r == 0);
3847 r= code->finalise();
3848 DBUG_ASSERT(r == 0);
3849 DBUG_RETURN(r);
3850 }
3851
3852 /**
3853 CFT_NDB_MAX
3854
3855 To perform conflict resolution, an interpreted program is used to read
3856 the timestamp stored locally and compare to what is going to be applied.
3857 If timestamp is lower, an error for this operation (9999) will be raised,
3858 and new row will not be applied. The error codes for the operations will
3859 be checked on return. For this to work is is vital that the operation
3860 is run with ignore error option.
3861
3862 Note that for delete, this algorithm reverts to the OLD algorithm.
3863 */
3864 int
row_conflict_fn_max(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const uchar * old_data,const uchar * new_data,const MY_BITMAP * write_set,NdbInterpretedCode * code)3865 row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
3866 enum_conflicting_op_type op_type,
3867 const uchar* old_data,
3868 const uchar* new_data,
3869 const MY_BITMAP* write_set,
3870 NdbInterpretedCode* code)
3871 {
3872 switch(op_type)
3873 {
3874 case WRITE_ROW:
3875 abort();
3876 return 1;
3877 case UPDATE_ROW:
3878 return row_conflict_fn_max_update_only(cfn_share,
3879 op_type,
3880 old_data,
3881 new_data,
3882 write_set,
3883 code);
3884 case DELETE_ROW:
3885 /* Can't use max of new image, as there's no new image
3886 * for DELETE
3887 * Use OLD instead
3888 */
3889 return row_conflict_fn_old(cfn_share,
3890 op_type,
3891 old_data,
3892 new_data,
3893 write_set,
3894 code);
3895 default:
3896 abort();
3897 return 1;
3898 }
3899 }
3900
3901
3902 /**
3903 CFT_NDB_MAX_DEL_WIN
3904
3905 To perform conflict resolution, an interpreted program is used to read
3906 the timestamp stored locally and compare to what is going to be applied.
3907 If timestamp is lower, an error for this operation (9999) will be raised,
3908 and new row will not be applied. The error codes for the operations will
3909 be checked on return. For this to work is is vital that the operation
3910 is run with ignore error option.
3911
3912 In this variant, replicated DELETEs alway succeed - no filter is added
3913 to them.
3914 */
3915
3916 int
row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const uchar * old_data,const uchar * new_data,const MY_BITMAP * write_set,NdbInterpretedCode * code)3917 row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
3918 enum_conflicting_op_type op_type,
3919 const uchar* old_data,
3920 const uchar* new_data,
3921 const MY_BITMAP* write_set,
3922 NdbInterpretedCode* code)
3923 {
3924 switch(op_type)
3925 {
3926 case WRITE_ROW:
3927 abort();
3928 return 1;
3929 case UPDATE_ROW:
3930 return row_conflict_fn_max_update_only(cfn_share,
3931 op_type,
3932 old_data,
3933 new_data,
3934 write_set,
3935 code);
3936 case DELETE_ROW:
3937 /* This variant always lets a received DELETE_ROW
3938 * succeed.
3939 */
3940 return 1;
3941 default:
3942 abort();
3943 return 1;
3944 }
3945 };
3946
3947
3948 /**
3949 CFT_NDB_EPOCH
3950
3951 */
3952
3953 int
row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE * cfn_share,enum_conflicting_op_type op_type,const uchar * old_data,const uchar * new_data,const MY_BITMAP * write_set,NdbInterpretedCode * code)3954 row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
3955 enum_conflicting_op_type op_type,
3956 const uchar* old_data,
3957 const uchar* new_data,
3958 const MY_BITMAP* write_set,
3959 NdbInterpretedCode* code)
3960 {
3961 DBUG_ENTER("row_conflict_fn_epoch");
3962 switch(op_type)
3963 {
3964 case WRITE_ROW:
3965 abort();
3966 DBUG_RETURN(1);
3967 case UPDATE_ROW:
3968 case DELETE_ROW:
3969 {
3970 const uint label_0= 0;
3971 const Uint32
3972 RegAuthor= 1, RegZero= 2,
3973 RegMaxRepEpoch= 1, RegRowEpoch= 2;
3974 int r;
3975
3976 r= code->load_const_u32(RegZero, 0);
3977 assert(r == 0);
3978 r= code->read_attr(RegAuthor, NdbDictionary::Column::ROW_AUTHOR);
3979 assert(r == 0);
3980 /* If last author was not local, assume no conflict */
3981 r= code->branch_ne(RegZero, RegAuthor, label_0);
3982 assert(r == 0);
3983
3984 /*
3985 * Load registers RegMaxRepEpoch and RegRowEpoch
3986 */
3987 r= code->load_const_u64(RegMaxRepEpoch, g_ndb_slave_state.max_rep_epoch);
3988 assert(r == 0);
3989 r= code->read_attr(RegRowEpoch, NdbDictionary::Column::ROW_GCI64);
3990 assert(r == 0);
3991
3992 /*
3993 * if RegRowEpoch <= RegMaxRepEpoch goto label_0
3994 * else raise error for this row
3995 */
3996 r= code->branch_le(RegRowEpoch, RegMaxRepEpoch, label_0);
3997 assert(r == 0);
3998 r= code->interpret_exit_nok(error_conflict_fn_violation);
3999 assert(r == 0);
4000 r= code->def_label(label_0);
4001 assert(r == 0);
4002 r= code->interpret_exit_ok();
4003 assert(r == 0);
4004 r= code->finalise();
4005 assert(r == 0);
4006 DBUG_RETURN(r);
4007 }
4008 default:
4009 abort();
4010 DBUG_RETURN(1);
4011 }
4012 };
4013
4014 static const st_conflict_fn_arg_def resolve_col_args[]=
4015 {
4016 /* Arg type Optional */
4017 { CFAT_COLUMN_NAME, false },
4018 { CFAT_END, false }
4019 };
4020
4021 static const st_conflict_fn_arg_def epoch_fn_args[]=
4022 {
4023 /* Arg type Optional */
4024 { CFAT_EXTRA_GCI_BITS, true },
4025 { CFAT_END, false }
4026 };
4027
4028 static const st_conflict_fn_def conflict_fns[]=
4029 {
4030 { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
4031 &resolve_col_args[0], row_conflict_fn_max_del_win },
4032 { "NDB$MAX", CFT_NDB_MAX,
4033 &resolve_col_args[0], row_conflict_fn_max },
4034 { "NDB$OLD", CFT_NDB_OLD,
4035 &resolve_col_args[0], row_conflict_fn_old },
4036 { "NDB$EPOCH", CFT_NDB_EPOCH,
4037 &epoch_fn_args[0], row_conflict_fn_epoch }
4038 };
4039
4040 static unsigned n_conflict_fns=
4041 sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
4042
4043
4044 int
parse_conflict_fn_spec(const char * conflict_fn_spec,const st_conflict_fn_def ** conflict_fn,st_conflict_fn_arg * args,Uint32 * max_args,const TABLE * table,char * msg,uint msg_len)4045 parse_conflict_fn_spec(const char* conflict_fn_spec,
4046 const st_conflict_fn_def** conflict_fn,
4047 st_conflict_fn_arg* args,
4048 Uint32* max_args,
4049 const TABLE* table,
4050 char *msg, uint msg_len)
4051 {
4052 DBUG_ENTER("parse_conflict_fn_spec");
4053
4054 Uint32 no_args = 0;
4055 const char *ptr= conflict_fn_spec;
4056 const char *error_str= "unknown conflict resolution function";
4057 /* remove whitespace */
4058 while (*ptr == ' ' && *ptr != '\0') ptr++;
4059
4060 DBUG_PRINT("info", ("parsing %s", conflict_fn_spec));
4061
4062 for (unsigned i= 0; i < n_conflict_fns; i++)
4063 {
4064 const st_conflict_fn_def &fn= conflict_fns[i];
4065
4066 uint len= strlen(fn.name);
4067 if (strncmp(ptr, fn.name, len))
4068 continue;
4069
4070 DBUG_PRINT("info", ("found function %s", fn.name));
4071
4072 /* skip function name */
4073 ptr+= len;
4074
4075 /* remove whitespace */
4076 while (*ptr == ' ' && *ptr != '\0') ptr++;
4077
4078 /* next '(' */
4079 if (*ptr != '(')
4080 {
4081 error_str= "missing '('";
4082 DBUG_PRINT("info", ("parse error %s", error_str));
4083 break;
4084 }
4085 ptr++;
4086
4087 /* find all arguments */
4088 for (;;)
4089 {
4090 if (no_args >= *max_args)
4091 {
4092 error_str= "too many arguments";
4093 DBUG_PRINT("info", ("parse error %s", error_str));
4094 break;
4095 }
4096
4097 /* expected type */
4098 enum enum_conflict_fn_arg_type type=
4099 conflict_fns[i].arg_defs[no_args].arg_type;
4100
4101 /* remove whitespace */
4102 while (*ptr == ' ' && *ptr != '\0') ptr++;
4103
4104 if (type == CFAT_END)
4105 {
4106 args[no_args].type= type;
4107 error_str= NULL;
4108 break;
4109 }
4110
4111 /* arg */
4112 /* Todo : Should support comma as an arg separator? */
4113 const char *start_arg= ptr;
4114 while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
4115 const char *end_arg= ptr;
4116
4117 bool optional_arg = conflict_fns[i].arg_defs[no_args].optional;
4118 /* any arg given? */
4119 if (start_arg == end_arg)
4120 {
4121 if (!optional_arg)
4122 {
4123 error_str= "missing function argument";
4124 DBUG_PRINT("info", ("parse error %s", error_str));
4125 break;
4126 }
4127 else
4128 {
4129 /* Arg was optional, and not present
4130 * Must be at end of args, finish parsing
4131 */
4132 args[no_args].type= CFAT_END;
4133 error_str= NULL;
4134 break;
4135 }
4136 }
4137
4138 uint len= end_arg - start_arg;
4139 args[no_args].type= type;
4140 args[no_args].ptr= start_arg;
4141 args[no_args].len= len;
4142 args[no_args].fieldno= (uint32)-1;
4143
4144 DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
4145
4146 bool arg_processing_error = false;
4147 switch (type)
4148 {
4149 case CFAT_COLUMN_NAME:
4150 {
4151 /* find column in table */
4152 DBUG_PRINT("info", ("searching for %s %u", start_arg, len));
4153 TABLE_SHARE *table_s= table->s;
4154 for (uint j= 0; j < table_s->fields; j++)
4155 {
4156 Field *field= table_s->field[j];
4157 if (strncmp(start_arg, field->field_name, len) == 0 &&
4158 field->field_name[len] == '\0')
4159 {
4160 DBUG_PRINT("info", ("found %s", field->field_name));
4161 args[no_args].fieldno= j;
4162 break;
4163 }
4164 }
4165 break;
4166 }
4167 case CFAT_EXTRA_GCI_BITS:
4168 {
4169 /* Map string to number and check it's in range etc */
4170 char* end_of_arg = (char*) end_arg;
4171 Uint32 bits = strtoul(start_arg, &end_of_arg, 0);
4172 DBUG_PRINT("info", ("Using %u as the number of extra bits", bits));
4173
4174 if (bits > 31)
4175 {
4176 arg_processing_error= true;
4177 error_str= "Too many extra Gci bits";
4178 DBUG_PRINT("info", ("%s", error_str));
4179 break;
4180 }
4181 /* Num bits seems ok */
4182 args[no_args].extraGciBits = bits;
4183 break;
4184 }
4185 case CFAT_END:
4186 abort();
4187 }
4188
4189 if (arg_processing_error)
4190 break;
4191 no_args++;
4192 }
4193
4194 if (error_str)
4195 break;
4196
4197 /* remove whitespace */
4198 while (*ptr == ' ' && *ptr != '\0') ptr++;
4199
4200 /* next ')' */
4201 if (*ptr != ')')
4202 {
4203 error_str= "missing ')'";
4204 break;
4205 }
4206 ptr++;
4207
4208 /* remove whitespace */
4209 while (*ptr == ' ' && *ptr != '\0') ptr++;
4210
4211 /* garbage in the end? */
4212 if (*ptr != '\0')
4213 {
4214 error_str= "garbage in the end";
4215 break;
4216 }
4217
4218 /* Update ptrs to conflict fn + # of args */
4219 *conflict_fn = &conflict_fns[i];
4220 *max_args = no_args;
4221
4222 DBUG_RETURN(0);
4223 }
4224 /* parse error */
4225 my_snprintf(msg, msg_len, "%s, %s at '%s'",
4226 conflict_fn_spec, error_str, ptr);
4227 DBUG_PRINT("info", ("%s", msg));
4228 DBUG_RETURN(-1);
4229 }
4230
4231 static int
setup_conflict_fn(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,char * msg,uint msg_len,TABLE * table,const st_conflict_fn_def * conflict_fn,const st_conflict_fn_arg * args,const Uint32 num_args)4232 setup_conflict_fn(THD *thd, NDB_SHARE *share,
4233 const NDBTAB *ndbtab,
4234 char *msg, uint msg_len,
4235 TABLE *table,
4236 const st_conflict_fn_def* conflict_fn,
4237 const st_conflict_fn_arg* args,
4238 const Uint32 num_args)
4239 {
4240 DBUG_ENTER("setup_conflict_fn");
4241
4242 /* setup the function */
4243 switch (conflict_fn->type)
4244 {
4245 case CFT_NDB_MAX:
4246 case CFT_NDB_OLD:
4247 case CFT_NDB_MAX_DEL_WIN:
4248 {
4249 if (num_args != 1)
4250 {
4251 my_snprintf(msg, msg_len,
4252 "Incorrect arguments to conflict function");
4253 DBUG_PRINT("info", ("%s", msg));
4254 DBUG_RETURN(-1);
4255 }
4256
4257 uint resolve_col_sz= 0;
4258
4259 if (0 == (resolve_col_sz =
4260 slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
4261 {
4262 /* wrong data type */
4263 slave_reset_conflict_fn(share);
4264 my_snprintf(msg, msg_len,
4265 "column '%s' has wrong datatype",
4266 table->s->field[args[0].fieldno]->field_name);
4267 DBUG_PRINT("info", ("%s", msg));
4268 DBUG_RETURN(-1);
4269 }
4270
4271 if (slave_set_resolve_fn(thd, share, ndbtab,
4272 args[0].fieldno, resolve_col_sz,
4273 conflict_fn, table, CFF_NONE))
4274 {
4275 my_snprintf(msg, msg_len,
4276 "unable to setup conflict resolution using column '%s'",
4277 table->s->field[args[0].fieldno]->field_name);
4278 DBUG_PRINT("info", ("%s", msg));
4279 DBUG_RETURN(-1);
4280 }
4281 if (opt_ndb_extra_logging)
4282 {
4283 sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
4284 table->s->db.str,
4285 table->s->table_name.str,
4286 conflict_fn->name,
4287 table->s->field[args[0].fieldno]->field_name);
4288 }
4289 break;
4290 }
4291 case CFT_NDB_EPOCH:
4292 {
4293 if (num_args > 1)
4294 {
4295 my_snprintf(msg, msg_len,
4296 "Too many arguments to conflict function");
4297 DBUG_PRINT("info", ("%s", msg));
4298 DBUG_RETURN(-1);
4299 }
4300
4301 /* Check that table doesn't have Blobs as we don't support that */
4302 if (share->flags & NSF_BLOB_FLAG)
4303 {
4304 my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH.");
4305 DBUG_PRINT("info", ("%s", msg));
4306 DBUG_RETURN(-1);
4307 }
4308
4309 /* Check that table has required extra meta-columns */
4310 /* Todo : Could warn if extra gcibits is insufficient to
4311 * represent SavePeriod/EpochPeriod
4312 */
4313 if (ndbtab->getExtraRowGciBits() == 0)
4314 sql_print_information("Ndb Slave : CFT_NDB_EPOCH, low epoch resolution");
4315
4316 if (ndbtab->getExtraRowAuthorBits() == 0)
4317 {
4318 my_snprintf(msg, msg_len, "No extra row author bits in table.");
4319 DBUG_PRINT("info", ("%s", msg));
4320 DBUG_RETURN(-1);
4321 }
4322
4323 if (slave_set_resolve_fn(thd, share, ndbtab,
4324 0, // field_no
4325 0, // resolve_col_sz
4326 conflict_fn, table, CFF_REFRESH_ROWS))
4327 {
4328 my_snprintf(msg, msg_len,
4329 "unable to setup conflict resolution");
4330 DBUG_PRINT("info", ("%s", msg));
4331 DBUG_RETURN(-1);
4332 }
4333 if (opt_ndb_extra_logging)
4334 {
4335 sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.",
4336 table->s->db.str,
4337 table->s->table_name.str,
4338 conflict_fn->name);
4339 }
4340 break;
4341 }
4342 case CFT_NUMBER_OF_CFTS:
4343 case CFT_NDB_UNDEF:
4344 abort();
4345 }
4346 DBUG_RETURN(0);
4347 }
4348
4349 static const char *ndb_rep_db= NDB_REP_DB;
4350 static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
4351 static const char *nrt_db= "db";
4352 static const char *nrt_table_name= "table_name";
4353 static const char *nrt_server_id= "server_id";
4354 static const char *nrt_binlog_type= "binlog_type";
4355 static const char *nrt_conflict_fn= "conflict_fn";
4356
4357 /*
4358 ndbcluster_read_replication_table
4359
4360 This function reads the information for the supplied table from
4361 the mysql.ndb_replication table.
4362 Where there is no information (or no table), defaults are
4363 returned.
4364 */
4365 int
ndbcluster_read_replication_table(THD * thd,Ndb * ndb,const char * db,const char * table_name,uint server_id,Uint32 * binlog_flags,char ** conflict_fn_spec,char * conflict_fn_buffer,Uint32 conflict_fn_buffer_len)4366 ndbcluster_read_replication_table(THD *thd, Ndb *ndb,
4367 const char* db,
4368 const char* table_name,
4369 uint server_id,
4370 Uint32* binlog_flags,
4371 char** conflict_fn_spec,
4372 char* conflict_fn_buffer,
4373 Uint32 conflict_fn_buffer_len)
4374 {
4375 DBUG_ENTER("ndbcluster_read_replication_table");
4376 NdbError ndberror;
4377 int error= 0;
4378 const char *error_str= "<none>";
4379
4380 ndb->setDatabaseName(ndb_rep_db);
4381 NDBDICT *dict= ndb->getDictionary();
4382 Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
4383 const NDBTAB *reptab= ndbtab_g.get_table();
4384 if (reptab == NULL &&
4385 (dict->getNdbError().classification == NdbError::SchemaError ||
4386 dict->getNdbError().code == 4009))
4387 {
4388 DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
4389 *binlog_flags= NBT_DEFAULT;
4390 *conflict_fn_spec= NULL;
4391 DBUG_RETURN(0);
4392 }
4393 const NDBCOL
4394 *col_db, *col_table_name, *col_server_id, *col_binlog_type, *col_conflict_fn;
4395 char tmp_buf[FN_REFLEN];
4396 uint retries= 100;
4397 int retry_sleep= 30; /* 30 milliseconds, transaction */
4398 if (reptab == NULL)
4399 {
4400 ndberror= dict->getNdbError();
4401 goto err;
4402 }
4403 if (reptab->getNoOfPrimaryKeys() != 3)
4404 {
4405 error= -2;
4406 error_str= "Wrong number of primary key parts, expected 3";
4407 goto err;
4408 }
4409 error= -1;
4410 col_db= reptab->getColumn(error_str= nrt_db);
4411 if (col_db == NULL ||
4412 !col_db->getPrimaryKey() ||
4413 col_db->getType() != NDBCOL::Varbinary)
4414 goto err;
4415 col_table_name= reptab->getColumn(error_str= nrt_table_name);
4416 if (col_table_name == NULL ||
4417 !col_table_name->getPrimaryKey() ||
4418 col_table_name->getType() != NDBCOL::Varbinary)
4419 goto err;
4420 col_server_id= reptab->getColumn(error_str= nrt_server_id);
4421 if (col_server_id == NULL ||
4422 !col_server_id->getPrimaryKey() ||
4423 col_server_id->getType() != NDBCOL::Unsigned)
4424 goto err;
4425 col_binlog_type= reptab->getColumn(error_str= nrt_binlog_type);
4426 if (col_binlog_type == NULL ||
4427 col_binlog_type->getPrimaryKey() ||
4428 col_binlog_type->getType() != NDBCOL::Unsigned)
4429 goto err;
4430 col_conflict_fn= reptab->getColumn(error_str= nrt_conflict_fn);
4431 if (col_conflict_fn == NULL)
4432 {
4433 col_conflict_fn= NULL;
4434 }
4435 else if (col_conflict_fn->getPrimaryKey() ||
4436 col_conflict_fn->getType() != NDBCOL::Varbinary)
4437 goto err;
4438
4439 error= 0;
4440 for (;;)
4441 {
4442 NdbTransaction *trans= ndb->startTransaction();
4443 if (trans == NULL)
4444 {
4445 ndberror= ndb->getNdbError();
4446 break;
4447 }
4448 NdbRecAttr *col_binlog_type_rec_attr[2];
4449 NdbRecAttr *col_conflict_fn_rec_attr[2]= {NULL, NULL};
4450 uint32 ndb_binlog_type[2];
4451 const uint sz= 256;
4452 char ndb_conflict_fn_buf[2*sz];
4453 char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
4454 NdbOperation *op[2];
4455 uint32 i, id= 0;
4456 /* Read generic row (server_id==0) and specific row (server_id == our id)
4457 * from ndb_replication.
4458 * Specific overrides generic, if present
4459 */
4460 for (i= 0; i < 2; i++)
4461 {
4462 NdbOperation *_op;
4463 DBUG_PRINT("info", ("reading[%u]: %s,%s,%u", i, db, table_name, id));
4464 if ((_op= trans->getNdbOperation(reptab)) == NULL) abort();
4465 if (_op->readTuple(NdbOperation::LM_CommittedRead)) abort();
4466 ndb_pack_varchar(col_db, tmp_buf, db, strlen(db));
4467 if (_op->equal(col_db->getColumnNo(), tmp_buf)) abort();
4468 ndb_pack_varchar(col_table_name, tmp_buf, table_name, strlen(table_name));
4469 if (_op->equal(col_table_name->getColumnNo(), tmp_buf)) abort();
4470 if (_op->equal(col_server_id->getColumnNo(), id)) abort();
4471 if ((col_binlog_type_rec_attr[i]=
4472 _op->getValue(col_binlog_type, (char *)&(ndb_binlog_type[i]))) == 0) abort();
4473 /* optional columns */
4474 if (col_conflict_fn)
4475 {
4476 if ((col_conflict_fn_rec_attr[i]=
4477 _op->getValue(col_conflict_fn, ndb_conflict_fn[i])) == 0) abort();
4478 }
4479 id= server_id;
4480 op[i]= _op;
4481 }
4482
4483 if (trans->execute(NdbTransaction::Commit,
4484 NdbOperation::AO_IgnoreError))
4485 {
4486 if (ndb->getNdbError().status == NdbError::TemporaryError)
4487 {
4488 if (retries--)
4489 {
4490 if (trans)
4491 ndb->closeTransaction(trans);
4492 do_retry_sleep(retry_sleep);
4493 continue;
4494 }
4495 }
4496 ndberror= trans->getNdbError();
4497 ndb->closeTransaction(trans);
4498 break;
4499 }
4500 for (i= 0; i < 2; i++)
4501 {
4502 if (op[i]->getNdbError().code)
4503 {
4504 if (op[i]->getNdbError().classification == NdbError::NoDataFound)
4505 {
4506 col_binlog_type_rec_attr[i]= NULL;
4507 col_conflict_fn_rec_attr[i]= NULL;
4508 DBUG_PRINT("info", ("not found row[%u]", i));
4509 continue;
4510 }
4511 ndberror= op[i]->getNdbError();
4512 break;
4513 }
4514 DBUG_PRINT("info", ("found row[%u]", i));
4515 }
4516 if (col_binlog_type_rec_attr[1] == NULL ||
4517 col_binlog_type_rec_attr[1]->isNULL())
4518 {
4519 /* No specific value, use generic */
4520 col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
4521 ndb_binlog_type[1]= ndb_binlog_type[0];
4522 }
4523 if (col_conflict_fn_rec_attr[1] == NULL ||
4524 col_conflict_fn_rec_attr[1]->isNULL())
4525 {
4526 /* No specific value, use generic */
4527 col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
4528 ndb_conflict_fn[1]= ndb_conflict_fn[0];
4529 }
4530
4531 if (col_binlog_type_rec_attr[1] == NULL ||
4532 col_binlog_type_rec_attr[1]->isNULL())
4533 {
4534 DBUG_PRINT("info", ("No binlog flag value, using default"));
4535 /* No value */
4536 *binlog_flags= NBT_DEFAULT;
4537 }
4538 else
4539 {
4540 DBUG_PRINT("info", ("Taking binlog flag value from the table"));
4541 *binlog_flags= (enum Ndb_binlog_type) ndb_binlog_type[1];
4542 }
4543
4544 if (col_conflict_fn_rec_attr[1] == NULL ||
4545 col_conflict_fn_rec_attr[1]->isNULL())
4546 {
4547 /* No conflict function */
4548 *conflict_fn_spec = NULL;
4549 }
4550 else
4551 {
4552 const char* conflict_fn = ndb_conflict_fn[1];
4553 uint len= 0;
4554 switch (col_conflict_fn->getArrayType())
4555 {
4556 case NDBCOL::ArrayTypeShortVar:
4557 len= *(uchar*)conflict_fn;
4558 conflict_fn++;
4559 break;
4560 case NDBCOL::ArrayTypeMediumVar:
4561 len= uint2korr(conflict_fn);
4562 conflict_fn+= 2;
4563 break;
4564 default:
4565 abort();
4566 }
4567 if ((len + 1) > conflict_fn_buffer_len)
4568 {
4569 ndb->closeTransaction(trans);
4570 error= -2;
4571 error_str= "Conflict function specification too long.";
4572 goto err;
4573 }
4574 memcpy(conflict_fn_buffer, conflict_fn, len);
4575 conflict_fn_buffer[len] = '\0';
4576 *conflict_fn_spec = conflict_fn_buffer;
4577 }
4578
4579 DBUG_PRINT("info", ("Retrieved Binlog flags : %u and function spec : %s",
4580 *binlog_flags, (*conflict_fn_spec != NULL ?*conflict_fn_spec:
4581 "NULL")));
4582
4583 ndb->closeTransaction(trans);
4584
4585 DBUG_RETURN(0);
4586 }
4587
4588 err:
4589 DBUG_PRINT("info", ("error %d, error_str %s, ndberror.code %u",
4590 error, error_str, ndberror.code));
4591 if (error < 0)
4592 {
4593 char msg[FN_REFLEN];
4594 switch (error)
4595 {
4596 case -1:
4597 my_snprintf(msg, sizeof(msg),
4598 "Missing or wrong type for column '%s'", error_str);
4599 break;
4600 case -2:
4601 my_snprintf(msg, sizeof(msg), "%s", error_str);
4602 break;
4603 default:
4604 abort();
4605 }
4606 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4607 ER_NDB_REPLICATION_SCHEMA_ERROR,
4608 ER(ER_NDB_REPLICATION_SCHEMA_ERROR),
4609 msg);
4610 }
4611 else
4612 {
4613 char msg[FN_REFLEN];
4614 my_snprintf(tmp_buf, sizeof(tmp_buf), "ndberror %u", ndberror.code);
4615 my_snprintf(msg, sizeof(msg), "Unable to retrieve %s.%s, logging and "
4616 "conflict resolution may not function as intended (%s)",
4617 ndb_rep_db, ndb_replication_table, tmp_buf);
4618 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4619 ER_ILLEGAL_HA_CREATE_OPTION,
4620 ER(ER_ILLEGAL_HA_CREATE_OPTION),
4621 ndbcluster_hton_name, msg);
4622 }
4623 *binlog_flags= NBT_DEFAULT;
4624 *conflict_fn_spec= NULL;
4625
4626 if (ndberror.code && opt_ndb_extra_logging)
4627 print_warning_list("NDB", thd);
4628 DBUG_RETURN(ndberror.code);
4629 }
4630
4631 /*
4632 ndbcluster_get_binlog_replication_info
4633
4634 This function retrieves the data for the given table
4635 from the ndb_replication table.
4636
4637 If the table is not found, or the table does not exist,
4638 then defaults are returned.
4639 */
4640 int
ndbcluster_get_binlog_replication_info(THD * thd,Ndb * ndb,const char * db,const char * table_name,uint server_id,const TABLE * table,Uint32 * binlog_flags,const st_conflict_fn_def ** conflict_fn,st_conflict_fn_arg * args,Uint32 * num_args)4641 ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
4642 const char* db,
4643 const char* table_name,
4644 uint server_id,
4645 const TABLE *table,
4646 Uint32* binlog_flags,
4647 const st_conflict_fn_def** conflict_fn,
4648 st_conflict_fn_arg* args,
4649 Uint32* num_args)
4650 {
4651 DBUG_ENTER("ndbcluster_get_binlog_replication_info");
4652
4653 /* Override for ndb_apply_status when logging */
4654 if (opt_ndb_log_apply_status)
4655 {
4656 if (strcmp(db, NDB_REP_DB) == 0 &&
4657 strcmp(table_name, NDB_APPLY_TABLE) == 0)
4658 {
4659 /*
4660 Ensure that we get all columns from ndb_apply_status updates
4661 by forcing FULL event type
4662 Also, ensure that ndb_apply_status events are always logged as
4663 WRITES.
4664 */
4665 DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
4666 sql_print_information("NDB : ndb-log-apply-status forcing "
4667 "%s.%s to FULL USE_WRITE",
4668 NDB_REP_DB, NDB_APPLY_TABLE);
4669 *binlog_flags = NBT_FULL;
4670 *conflict_fn = NULL;
4671 *num_args = 0;
4672 DBUG_RETURN(0);
4673 }
4674 }
4675
4676 const Uint32 MAX_CONFLICT_FN_SPEC_LEN = 256;
4677 char conflict_fn_buffer[MAX_CONFLICT_FN_SPEC_LEN];
4678 char* conflict_fn_spec;
4679
4680 if (ndbcluster_read_replication_table(thd,
4681 ndb,
4682 db,
4683 table_name,
4684 server_id,
4685 binlog_flags,
4686 &conflict_fn_spec,
4687 conflict_fn_buffer,
4688 MAX_CONFLICT_FN_SPEC_LEN) != 0)
4689 {
4690 DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
4691 }
4692
4693 if (table != NULL)
4694 {
4695 if (conflict_fn_spec != NULL)
4696 {
4697 char tmp_buf[FN_REFLEN];
4698
4699 if (parse_conflict_fn_spec(conflict_fn_spec,
4700 conflict_fn,
4701 args,
4702 num_args,
4703 table,
4704 tmp_buf,
4705 sizeof(tmp_buf)) != 0)
4706 {
4707 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4708 ER_CONFLICT_FN_PARSE_ERROR,
4709 ER(ER_CONFLICT_FN_PARSE_ERROR),
4710 tmp_buf);
4711 DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
4712 }
4713 }
4714 else
4715 {
4716 /* No conflict function specified */
4717 conflict_fn= NULL;
4718 num_args= 0;
4719 }
4720 }
4721
4722 DBUG_RETURN(0);
4723 }
4724
4725 int
ndbcluster_apply_binlog_replication_info(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,TABLE * table,const st_conflict_fn_def * conflict_fn,const st_conflict_fn_arg * args,Uint32 num_args,bool do_set_binlog_flags,Uint32 binlog_flags)4726 ndbcluster_apply_binlog_replication_info(THD *thd,
4727 NDB_SHARE *share,
4728 const NDBTAB* ndbtab,
4729 TABLE* table,
4730 const st_conflict_fn_def* conflict_fn,
4731 const st_conflict_fn_arg* args,
4732 Uint32 num_args,
4733 bool do_set_binlog_flags,
4734 Uint32 binlog_flags)
4735 {
4736 DBUG_ENTER("ndbcluster_apply_binlog_replication_info");
4737 char tmp_buf[FN_REFLEN];
4738
4739 if (do_set_binlog_flags)
4740 {
4741 DBUG_PRINT("info", ("Setting binlog flags to %u", binlog_flags));
4742 set_binlog_flags(share, (enum Ndb_binlog_type)binlog_flags);
4743 }
4744
4745 if (conflict_fn != NULL)
4746 {
4747 if (setup_conflict_fn(thd, share,
4748 ndbtab,
4749 tmp_buf, sizeof(tmp_buf),
4750 table,
4751 conflict_fn,
4752 args,
4753 num_args) != 0)
4754 {
4755 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
4756 ER_CONFLICT_FN_PARSE_ERROR,
4757 ER(ER_CONFLICT_FN_PARSE_ERROR),
4758 tmp_buf);
4759 DBUG_RETURN(-1);
4760 }
4761 }
4762 else
4763 {
4764 /* No conflict function specified */
4765 slave_reset_conflict_fn(share);
4766 }
4767
4768 DBUG_RETURN(0);
4769 }
4770
4771 int
ndbcluster_read_binlog_replication(THD * thd,Ndb * ndb,NDB_SHARE * share,const NDBTAB * ndbtab,uint server_id,TABLE * table,bool do_set_binlog_flags)4772 ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
4773 NDB_SHARE *share,
4774 const NDBTAB *ndbtab,
4775 uint server_id,
4776 TABLE *table,
4777 bool do_set_binlog_flags)
4778 {
4779 DBUG_ENTER("ndbcluster_read_binlog_replication");
4780 Uint32 binlog_flags;
4781 const st_conflict_fn_def* conflict_fn= NULL;
4782 st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
4783 Uint32 num_args = MAX_CONFLICT_ARGS;
4784
4785 if ((ndbcluster_get_binlog_replication_info(thd, ndb,
4786 share->db,
4787 share->table_name,
4788 server_id,
4789 table,
4790 &binlog_flags,
4791 &conflict_fn,
4792 args,
4793 &num_args) != 0) ||
4794 (ndbcluster_apply_binlog_replication_info(thd,
4795 share,
4796 ndbtab,
4797 table,
4798 conflict_fn,
4799 args,
4800 num_args,
4801 do_set_binlog_flags,
4802 binlog_flags) != 0))
4803 {
4804 DBUG_RETURN(-1);
4805 }
4806
4807 DBUG_RETURN(0);
4808 }
4809 #endif /* HAVE_NDB_BINLOG */
4810
4811 bool
ndbcluster_check_if_local_table(const char * dbname,const char * tabname)4812 ndbcluster_check_if_local_table(const char *dbname, const char *tabname)
4813 {
4814 char key[FN_REFLEN + 1];
4815 char ndb_file[FN_REFLEN + 1];
4816
4817 DBUG_ENTER("ndbcluster_check_if_local_table");
4818 build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0);
4819 build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0);
4820 /* Check that any defined table is an ndb table */
4821 DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file));
4822 if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK))
4823 {
4824 DBUG_PRINT("info", ("table file %s not on disk, local table", ndb_file));
4825
4826
4827 DBUG_RETURN(true);
4828 }
4829
4830 DBUG_RETURN(false);
4831 }
4832
4833 bool
ndbcluster_check_if_local_tables_in_db(THD * thd,const char * dbname)4834 ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
4835 {
4836 DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
4837 DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
4838 LEX_STRING *tabname;
4839 List<LEX_STRING> files;
4840 char path[FN_REFLEN + 1];
4841
4842 build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
4843 if (find_files(thd, &files, dbname, path, NullS, 0, NULL) !=
4844 FIND_FILES_OK)
4845 {
4846 thd->clear_error();
4847 DBUG_PRINT("info", ("Failed to find files"));
4848 DBUG_RETURN(true);
4849 }
4850 DBUG_PRINT("info",("found: %d files", files.elements));
4851 while ((tabname= files.pop()))
4852 {
4853 DBUG_PRINT("info", ("Found table %s", tabname->str));
4854 if (ndbcluster_check_if_local_table(dbname, tabname->str))
4855 DBUG_RETURN(true);
4856 }
4857
4858 DBUG_RETURN(false);
4859 }
4860
4861 /*
4862 Common function for setting up everything for logging a table at
4863 create/discover.
4864 */
ndbcluster_create_binlog_setup(THD * thd,Ndb * ndb,const char * key,uint key_len,const char * db,const char * table_name,TABLE * table)4865 int ndbcluster_create_binlog_setup(THD *thd, Ndb *ndb, const char *key,
4866 uint key_len,
4867 const char *db,
4868 const char *table_name,
4869 TABLE * table)
4870 {
4871 int do_event_op= ndb_binlog_running;
4872 DBUG_ENTER("ndbcluster_create_binlog_setup");
4873 DBUG_PRINT("enter",("key: %s key_len: %d %s.%s",
4874 key, key_len, db, table_name));
4875 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(table_name));
4876 DBUG_ASSERT(strlen(key) == key_len);
4877
4878 pthread_mutex_lock(&ndbcluster_mutex);
4879 NDB_SHARE * share = get_share(key, table, TRUE, TRUE);
4880 if (share == 0)
4881 {
4882 /**
4883 * Failed to create share
4884 */
4885 pthread_mutex_unlock(&ndbcluster_mutex);
4886 DBUG_RETURN(-1);
4887 }
4888 pthread_mutex_unlock(&ndbcluster_mutex);
4889
4890 pthread_mutex_lock(&share->mutex);
4891 if (get_binlog_nologging(share) || share->op != 0 || share->new_op != 0)
4892 {
4893 pthread_mutex_unlock(&share->mutex);
4894 free_share(&share);
4895 DBUG_RETURN(0); // replication already setup, or should not
4896 }
4897
4898 if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name))
4899 {
4900 // The distributed privilege tables are distributed by writing
4901 // the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need
4902 // to listen to events from this table
4903 DBUG_PRINT("info", ("Skipping binlogging of table %s/%s", db, table_name));
4904 do_event_op= 0;
4905 }
4906
4907 if (!ndb_schema_share &&
4908 strcmp(share->db, NDB_REP_DB) == 0 &&
4909 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
4910 do_event_op= 1;
4911 else if (!ndb_apply_status_share &&
4912 strcmp(share->db, NDB_REP_DB) == 0 &&
4913 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
4914 do_event_op= 1;
4915
4916 if (!do_event_op)
4917 {
4918 set_binlog_nologging(share);
4919 pthread_mutex_unlock(&share->mutex);
4920 DBUG_RETURN(0);
4921 }
4922
4923 while (share && !IS_TMP_PREFIX(table_name))
4924 {
4925 /*
4926 ToDo make sanity check of share so that the table is actually the same
4927 I.e. we need to do open file from frm in this case
4928 Currently awaiting this to be fixed in the 4.1 tree in the general
4929 case
4930 */
4931
4932 /* Create the event in NDB */
4933 ndb->setDatabaseName(db);
4934
4935 NDBDICT *dict= ndb->getDictionary();
4936 Ndb_table_guard ndbtab_g(dict, table_name);
4937 const NDBTAB *ndbtab= ndbtab_g.get_table();
4938 if (ndbtab == 0)
4939 {
4940 if (opt_ndb_extra_logging)
4941 sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
4942 "%s, %d", key, dict->getNdbError().message,
4943 dict->getNdbError().code);
4944 break; // error
4945 }
4946 #ifdef HAVE_NDB_BINLOG
4947 /*
4948 */
4949 ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
4950 ::server_id, NULL, TRUE);
4951 #endif
4952 /*
4953 check if logging turned off for this table
4954 */
4955 if (get_binlog_nologging(share))
4956 {
4957 if (opt_ndb_extra_logging)
4958 sql_print_information("NDB Binlog: NOT logging %s", share->key);
4959 pthread_mutex_unlock(&share->mutex);
4960 DBUG_RETURN(0);
4961 }
4962
4963 String event_name(INJECTOR_EVENT_LEN);
4964 ndb_rep_event_name(&event_name, db, table_name, get_binlog_full(share));
4965 /*
4966 event should have been created by someone else,
4967 but let's make sure, and create if it doesn't exist
4968 */
4969 const NDBEVENT *ev= dict->getEvent(event_name.c_ptr());
4970 if (!ev)
4971 {
4972 if (ndbcluster_create_event(thd, ndb, ndbtab, event_name.c_ptr(), share))
4973 {
4974 sql_print_error("NDB Binlog: "
4975 "FAILED CREATE (DISCOVER) TABLE Event: %s",
4976 event_name.c_ptr());
4977 break; // error
4978 }
4979 if (opt_ndb_extra_logging)
4980 sql_print_information("NDB Binlog: "
4981 "CREATE (DISCOVER) TABLE Event: %s",
4982 event_name.c_ptr());
4983 }
4984 else
4985 {
4986 delete ev;
4987 if (opt_ndb_extra_logging)
4988 sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
4989 event_name.c_ptr());
4990 }
4991
4992 /*
4993 create the event operations for receiving logging events
4994 */
4995 if (ndbcluster_create_event_ops(thd, share,
4996 ndbtab, event_name.c_ptr()))
4997 {
4998 sql_print_error("NDB Binlog:"
4999 "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
5000 event_name.c_ptr());
5001 /* a warning has been issued to the client */
5002 break;
5003 }
5004 pthread_mutex_unlock(&share->mutex);
5005 DBUG_RETURN(0);
5006 }
5007
5008 pthread_mutex_unlock(&share->mutex);
5009 free_share(&share);
5010 DBUG_RETURN(-1);
5011 }
5012
5013 int
ndbcluster_create_event(THD * thd,Ndb * ndb,const NDBTAB * ndbtab,const char * event_name,NDB_SHARE * share,int push_warning)5014 ndbcluster_create_event(THD *thd, Ndb *ndb, const NDBTAB *ndbtab,
5015 const char *event_name, NDB_SHARE *share,
5016 int push_warning)
5017 {
5018 DBUG_ENTER("ndbcluster_create_event");
5019 DBUG_PRINT("info", ("table=%s version=%d event=%s share=%s",
5020 ndbtab->getName(), ndbtab->getObjectVersion(),
5021 event_name, share ? share->key : "(nil)"));
5022 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
5023 if (!share)
5024 {
5025 DBUG_PRINT("info", ("share == NULL"));
5026 DBUG_RETURN(0);
5027 }
5028 if (get_binlog_nologging(share))
5029 {
5030 if (opt_ndb_extra_logging && ndb_binlog_running)
5031 sql_print_information("NDB Binlog: NOT logging %s", share->key);
5032 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x %d",
5033 share->flags, share->flags & NSF_NO_BINLOG));
5034 DBUG_RETURN(0);
5035 }
5036
5037 ndb->setDatabaseName(share->db);
5038 NDBDICT *dict= ndb->getDictionary();
5039 NDBEVENT my_event(event_name);
5040 my_event.setTable(*ndbtab);
5041 my_event.addTableEvent(NDBEVENT::TE_ALL);
5042 if (share->flags & NSF_HIDDEN_PK)
5043 {
5044 if (share->flags & NSF_BLOB_FLAG)
5045 {
5046 sql_print_error("NDB Binlog: logging of table %s "
5047 "with BLOB attribute and no PK is not supported",
5048 share->key);
5049 if (push_warning)
5050 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5051 ER_ILLEGAL_HA_CREATE_OPTION,
5052 ER(ER_ILLEGAL_HA_CREATE_OPTION),
5053 ndbcluster_hton_name,
5054 "Binlog of table with BLOB attribute and no PK");
5055
5056 share->flags|= NSF_NO_BINLOG;
5057 DBUG_RETURN(-1);
5058 }
5059 /* No primary key, subscribe for all attributes */
5060 my_event.setReport((NDBEVENT::EventReport)
5061 (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
5062 DBUG_PRINT("info", ("subscription all"));
5063 }
5064 else
5065 {
5066 if (strcmp(share->db, NDB_REP_DB) == 0 &&
5067 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
5068 {
5069 /**
5070 * ER_SUBSCRIBE is only needed on NDB_SCHEMA_TABLE
5071 */
5072 my_event.setReport((NDBEVENT::EventReport)
5073 (NDBEVENT::ER_ALL |
5074 NDBEVENT::ER_SUBSCRIBE |
5075 NDBEVENT::ER_DDL));
5076 DBUG_PRINT("info", ("subscription all and subscribe"));
5077 }
5078 else
5079 {
5080 if (get_binlog_full(share))
5081 {
5082 my_event.setReport((NDBEVENT::EventReport)
5083 (NDBEVENT::ER_ALL | NDBEVENT::ER_DDL));
5084 DBUG_PRINT("info", ("subscription all"));
5085 }
5086 else
5087 {
5088 my_event.setReport((NDBEVENT::EventReport)
5089 (NDBEVENT::ER_UPDATED | NDBEVENT::ER_DDL));
5090 DBUG_PRINT("info", ("subscription only updated"));
5091 }
5092 }
5093 }
5094 if (share->flags & NSF_BLOB_FLAG)
5095 my_event.mergeEvents(TRUE);
5096
5097 /* add all columns to the event */
5098 int n_cols= ndbtab->getNoOfColumns();
5099 for(int a= 0; a < n_cols; a++)
5100 my_event.addEventColumn(a);
5101
5102 if (dict->createEvent(my_event)) // Add event to database
5103 {
5104 if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
5105 {
5106 /*
5107 failed, print a warning
5108 */
5109 if (push_warning > 1)
5110 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5111 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5112 dict->getNdbError().code,
5113 dict->getNdbError().message, "NDB");
5114 sql_print_error("NDB Binlog: Unable to create event in database. "
5115 "Event: %s Error Code: %d Message: %s", event_name,
5116 dict->getNdbError().code, dict->getNdbError().message);
5117 DBUG_RETURN(-1);
5118 }
5119
5120 /*
5121 try retrieving the event, if table version/id matches, we will get
5122 a valid event. Otherwise we have a trailing event from before
5123 */
5124 const NDBEVENT *ev;
5125 if ((ev= dict->getEvent(event_name)))
5126 {
5127 delete ev;
5128 DBUG_RETURN(0);
5129 }
5130
5131 /*
5132 trailing event from before; an error, but try to correct it
5133 */
5134 if (dict->getNdbError().code == NDB_INVALID_SCHEMA_OBJECT &&
5135 dict->dropEvent(my_event.getName(), 1))
5136 {
5137 if (push_warning > 1)
5138 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5139 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5140 dict->getNdbError().code,
5141 dict->getNdbError().message, "NDB");
5142 sql_print_error("NDB Binlog: Unable to create event in database. "
5143 " Attempt to correct with drop failed. "
5144 "Event: %s Error Code: %d Message: %s",
5145 event_name,
5146 dict->getNdbError().code,
5147 dict->getNdbError().message);
5148 DBUG_RETURN(-1);
5149 }
5150
5151 /*
5152 try to add the event again
5153 */
5154 if (dict->createEvent(my_event))
5155 {
5156 if (push_warning > 1)
5157 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5158 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5159 dict->getNdbError().code,
5160 dict->getNdbError().message, "NDB");
5161 sql_print_error("NDB Binlog: Unable to create event in database. "
5162 " Attempt to correct with drop ok, but create failed. "
5163 "Event: %s Error Code: %d Message: %s",
5164 event_name,
5165 dict->getNdbError().code,
5166 dict->getNdbError().message);
5167 DBUG_RETURN(-1);
5168 }
5169 #ifdef NDB_BINLOG_EXTRA_WARNINGS
5170 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5171 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5172 0, "NDB Binlog: Removed trailing event",
5173 "NDB");
5174 #endif
5175 }
5176
5177 DBUG_RETURN(0);
5178 }
5179
is_ndb_compatible_type(Field * field)5180 inline int is_ndb_compatible_type(Field *field)
5181 {
5182 return
5183 !(field->flags & BLOB_FLAG) &&
5184 field->type() != MYSQL_TYPE_BIT &&
5185 field->pack_length() != 0;
5186 }
5187
5188 /*
5189 - create eventOperations for receiving log events
5190 - setup ndb recattrs for reception of log event data
5191 - "start" the event operation
5192
5193 used at create/discover of tables
5194 */
5195 int
ndbcluster_create_event_ops(THD * thd,NDB_SHARE * share,const NDBTAB * ndbtab,const char * event_name)5196 ndbcluster_create_event_ops(THD *thd, NDB_SHARE *share,
5197 const NDBTAB *ndbtab, const char *event_name)
5198 {
5199 /*
5200 we are in either create table or rename table so table should be
5201 locked, hence we can work with the share without locks
5202 */
5203
5204 DBUG_ENTER("ndbcluster_create_event_ops");
5205 DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name));
5206 DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName()));
5207
5208 DBUG_ASSERT(share != 0);
5209
5210 if (get_binlog_nologging(share))
5211 {
5212 DBUG_PRINT("info", ("share->flags & NSF_NO_BINLOG, flags: %x",
5213 share->flags));
5214 DBUG_RETURN(0);
5215 }
5216
5217 // Don't allow event ops to be created on distributed priv tables
5218 // they are distributed via ndb_schema
5219 assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db,
5220 share->table_name));
5221
5222 Ndb_event_data *event_data= share->event_data;
5223 int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0;
5224 #ifdef HAVE_NDB_BINLOG
5225 uint len= strlen(share->table_name);
5226 #endif
5227 if (!ndb_schema_share && strcmp(share->db, NDB_REP_DB) == 0 &&
5228 strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0)
5229 do_ndb_schema_share= 1;
5230 else if (!ndb_apply_status_share && strcmp(share->db, NDB_REP_DB) == 0 &&
5231 strcmp(share->table_name, NDB_APPLY_TABLE) == 0)
5232 do_ndb_apply_status_share= 1;
5233 else
5234 #ifdef HAVE_NDB_BINLOG
5235 if (!binlog_filter->db_ok(share->db) ||
5236 !ndb_binlog_running ||
5237 (len >= sizeof(NDB_EXCEPTIONS_TABLE_SUFFIX) &&
5238 strcmp(share->table_name+len-sizeof(NDB_EXCEPTIONS_TABLE_SUFFIX)+1,
5239 lower_case_table_names ? NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER :
5240 NDB_EXCEPTIONS_TABLE_SUFFIX) == 0))
5241 #endif
5242 {
5243 share->flags|= NSF_NO_BINLOG;
5244 DBUG_RETURN(0);
5245 }
5246
5247 if (share->op)
5248 {
5249 event_data= (Ndb_event_data *) share->op->getCustomData();
5250 assert(event_data->share == share);
5251 assert(share->event_data == 0);
5252
5253 DBUG_ASSERT(share->use_count > 1);
5254 sql_print_error("NDB Binlog: discover reusing old ev op");
5255 /* ndb_share reference ToDo free */
5256 DBUG_PRINT("NDB_SHARE", ("%s ToDo free use_count: %u",
5257 share->key, share->use_count));
5258 free_share(&share); // old event op already has reference
5259 DBUG_RETURN(0);
5260 }
5261
5262 DBUG_ASSERT(event_data != 0);
5263 TABLE *table= event_data->shadow_table;
5264
5265 int retries= 100;
5266 /*
5267 100 milliseconds, temporary error on schema operation can
5268 take some time to be resolved
5269 */
5270 int retry_sleep= 100;
5271 while (1)
5272 {
5273 Mutex_guard injector_mutex_g(injector_mutex);
5274 Ndb *ndb= injector_ndb;
5275 if (do_ndb_schema_share)
5276 ndb= schema_ndb;
5277
5278 if (ndb == 0)
5279 DBUG_RETURN(-1);
5280
5281 NdbEventOperation* op;
5282 if (do_ndb_schema_share)
5283 op= ndb->createEventOperation(event_name);
5284 else
5285 {
5286 // set injector_ndb database/schema from table internal name
5287 int ret= ndb->setDatabaseAndSchemaName(ndbtab);
5288 assert(ret == 0); NDB_IGNORE_VALUE(ret);
5289 op= ndb->createEventOperation(event_name);
5290 // reset to catch errors
5291 ndb->setDatabaseName("");
5292 }
5293 if (!op)
5294 {
5295 sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
5296 " %s",event_name);
5297 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5298 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5299 ndb->getNdbError().code,
5300 ndb->getNdbError().message,
5301 "NDB");
5302 DBUG_RETURN(-1);
5303 }
5304
5305 if (share->flags & NSF_BLOB_FLAG)
5306 op->mergeEvents(TRUE); // currently not inherited from event
5307
5308 uint n_columns= ndbtab->getNoOfColumns();
5309 uint n_fields= table->s->fields;
5310 uint val_length= sizeof(NdbValue) * n_columns;
5311
5312 /*
5313 Allocate memory globally so it can be reused after online alter table
5314 */
5315 if (my_multi_malloc(MYF(MY_WME),
5316 &event_data->ndb_value[0],
5317 val_length,
5318 &event_data->ndb_value[1],
5319 val_length,
5320 NULL) == 0)
5321 {
5322 DBUG_PRINT("info", ("Failed to allocate records for event operation"));
5323 DBUG_RETURN(-1);
5324 }
5325
5326 for (uint j= 0; j < n_columns; j++)
5327 {
5328 const char *col_name= ndbtab->getColumn(j)->getName();
5329 NdbValue attr0, attr1;
5330 if (j < n_fields)
5331 {
5332 Field *f= table->field[j];
5333 if (is_ndb_compatible_type(f))
5334 {
5335 DBUG_PRINT("info", ("%s compatible", col_name));
5336 attr0.rec= op->getValue(col_name, (char*) f->ptr);
5337 attr1.rec= op->getPreValue(col_name,
5338 (f->ptr - table->record[0]) +
5339 (char*) table->record[1]);
5340 }
5341 else if (! (f->flags & BLOB_FLAG))
5342 {
5343 DBUG_PRINT("info", ("%s non compatible", col_name));
5344 attr0.rec= op->getValue(col_name);
5345 attr1.rec= op->getPreValue(col_name);
5346 }
5347 else
5348 {
5349 DBUG_PRINT("info", ("%s blob", col_name));
5350 DBUG_ASSERT(share->flags & NSF_BLOB_FLAG);
5351 attr0.blob= op->getBlobHandle(col_name);
5352 attr1.blob= op->getPreBlobHandle(col_name);
5353 if (attr0.blob == NULL || attr1.blob == NULL)
5354 {
5355 sql_print_error("NDB Binlog: Creating NdbEventOperation"
5356 " blob field %u handles failed (code=%d) for %s",
5357 j, op->getNdbError().code, event_name);
5358 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5359 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5360 op->getNdbError().code,
5361 op->getNdbError().message,
5362 "NDB");
5363 ndb->dropEventOperation(op);
5364 DBUG_RETURN(-1);
5365 }
5366 }
5367 }
5368 else
5369 {
5370 DBUG_PRINT("info", ("%s hidden key", col_name));
5371 attr0.rec= op->getValue(col_name);
5372 attr1.rec= op->getPreValue(col_name);
5373 }
5374 event_data->ndb_value[0][j].ptr= attr0.ptr;
5375 event_data->ndb_value[1][j].ptr= attr1.ptr;
5376 DBUG_PRINT("info", ("&event_data->ndb_value[0][%d]: 0x%lx "
5377 "event_data->ndb_value[0][%d]: 0x%lx",
5378 j, (long) &event_data->ndb_value[0][j],
5379 j, (long) attr0.ptr));
5380 DBUG_PRINT("info", ("&event_data->ndb_value[1][%d]: 0x%lx "
5381 "event_data->ndb_value[1][%d]: 0x%lx",
5382 j, (long) &event_data->ndb_value[0][j],
5383 j, (long) attr1.ptr));
5384 }
5385 op->setCustomData((void *) event_data); // set before execute
5386 share->event_data= 0; // take over event data
5387 share->op= op; // assign op in NDB_SHARE
5388
5389 if (op->execute())
5390 {
5391 share->op= NULL;
5392 retries--;
5393 if (op->getNdbError().status != NdbError::TemporaryError &&
5394 op->getNdbError().code != 1407)
5395 retries= 0;
5396 if (retries == 0)
5397 {
5398 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5399 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5400 op->getNdbError().code, op->getNdbError().message,
5401 "NDB");
5402 sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
5403 event_name,
5404 op->getNdbError().code, op->getNdbError().message);
5405 }
5406 share->event_data= event_data;
5407 op->setCustomData(NULL);
5408 ndb->dropEventOperation(op);
5409 if (retries && !thd->killed)
5410 {
5411 do_retry_sleep(retry_sleep);
5412 continue;
5413 }
5414 DBUG_RETURN(-1);
5415 }
5416 break;
5417 }
5418
5419 /* ndb_share reference binlog */
5420 get_share(share);
5421 DBUG_PRINT("NDB_SHARE", ("%s binlog use_count: %u",
5422 share->key, share->use_count));
5423 if (do_ndb_apply_status_share)
5424 {
5425 /* ndb_share reference binlog extra */
5426 ndb_apply_status_share= get_share(share);
5427 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
5428 share->key, share->use_count));
5429 (void) pthread_cond_signal(&injector_cond);
5430 }
5431 else if (do_ndb_schema_share)
5432 {
5433 /* ndb_share reference binlog extra */
5434 ndb_schema_share= get_share(share);
5435 DBUG_PRINT("NDB_SHARE", ("%s binlog extra use_count: %u",
5436 share->key, share->use_count));
5437 (void) pthread_cond_signal(&injector_cond);
5438 }
5439
5440 DBUG_PRINT("info",("%s share->op: 0x%lx share->use_count: %u",
5441 share->key, (long) share->op, share->use_count));
5442
5443 if (opt_ndb_extra_logging)
5444 sql_print_information("NDB Binlog: logging %s (%s,%s)", share->key,
5445 get_binlog_full(share) ? "FULL" : "UPDATED",
5446 get_binlog_use_update(share) ? "USE_UPDATE" : "USE_WRITE");
5447 DBUG_RETURN(0);
5448 }
5449
5450 int
ndbcluster_drop_event(THD * thd,Ndb * ndb,NDB_SHARE * share,const char * type_str,const char * dbname,const char * tabname)5451 ndbcluster_drop_event(THD *thd, Ndb *ndb, NDB_SHARE *share,
5452 const char *type_str,
5453 const char *dbname,
5454 const char *tabname)
5455 {
5456 DBUG_ENTER("ndbcluster_drop_event");
5457 /*
5458 There might be 2 types of events setup for the table, we cannot know
5459 which ones are supposed to be there as they may have been created
5460 differently for different mysqld's. So we drop both
5461 */
5462 for (uint i= 0; i < 2; i++)
5463 {
5464 NDBDICT *dict= ndb->getDictionary();
5465 String event_name(INJECTOR_EVENT_LEN);
5466 ndb_rep_event_name(&event_name, dbname, tabname, i);
5467
5468 if (!dict->dropEvent(event_name.c_ptr()))
5469 continue;
5470
5471 if (dict->getNdbError().code != 4710 &&
5472 dict->getNdbError().code != 1419)
5473 {
5474 /* drop event failed for some reason, issue a warning */
5475 push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
5476 ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
5477 dict->getNdbError().code,
5478 dict->getNdbError().message, "NDB");
5479 /* error is not that the event did not exist */
5480 sql_print_error("NDB Binlog: Unable to drop event in database. "
5481 "Event: %s Error Code: %d Message: %s",
5482 event_name.c_ptr(),
5483 dict->getNdbError().code,
5484 dict->getNdbError().message);
5485 /* ToDo; handle error? */
5486 if (share && share->op &&
5487 share->op->getState() == NdbEventOperation::EO_EXECUTING &&
5488 dict->getNdbError().mysql_code != HA_ERR_NO_CONNECTION)
5489 {
5490 DBUG_ASSERT(FALSE);
5491 DBUG_RETURN(-1);
5492 }
5493 }
5494 }
5495 DBUG_RETURN(0);
5496 }
5497
5498 /*
5499 when entering the calling thread should have a share lock id share != 0
5500 then the injector thread will have one as well, i.e. share->use_count == 0
5501 (unless it has already dropped... then share->op == 0)
5502 */
5503
5504 int
ndbcluster_handle_drop_table(THD * thd,Ndb * ndb,NDB_SHARE * share,const char * type_str,const char * dbname,const char * tabname)5505 ndbcluster_handle_drop_table(THD *thd, Ndb *ndb, NDB_SHARE *share,
5506 const char *type_str,
5507 const char * dbname, const char * tabname)
5508 {
5509 DBUG_ENTER("ndbcluster_handle_drop_table");
5510
5511 if (dbname && tabname)
5512 {
5513 if (ndbcluster_drop_event(thd, ndb, share, type_str, dbname, tabname))
5514 DBUG_RETURN(-1);
5515 }
5516
5517 if (share == 0 || share->op == 0)
5518 {
5519 DBUG_RETURN(0);
5520 }
5521
5522 /*
5523 Syncronized drop between client thread and injector thread is
5524 neccessary in order to maintain ordering in the binlog,
5525 such that the drop occurs _after_ any inserts/updates/deletes.
5526
5527 The penalty for this is that the drop table becomes slow.
5528
5529 This wait is however not strictly neccessary to produce a binlog
5530 that is usable. However the slave does not currently handle
5531 these out of order, thus we are keeping the SYNC_DROP_ defined
5532 for now.
5533 */
5534 const char *save_proc_info= thd->proc_info;
5535 #define SYNC_DROP_
5536 #ifdef SYNC_DROP_
5537 thd->proc_info= "Syncing ndb table schema operation and binlog";
5538 pthread_mutex_lock(&share->mutex);
5539 int max_timeout= DEFAULT_SYNC_TIMEOUT;
5540 while (share->op)
5541 {
5542 struct timespec abstime;
5543 set_timespec(abstime, 1);
5544 int ret= pthread_cond_timedwait(&injector_cond,
5545 &share->mutex,
5546 &abstime);
5547 if (thd->killed ||
5548 share->op == 0)
5549 break;
5550 if (ret)
5551 {
5552 max_timeout--;
5553 if (max_timeout == 0)
5554 {
5555 sql_print_error("NDB %s: %s timed out. Ignoring...",
5556 type_str, share->key);
5557 DBUG_ASSERT(false);
5558 break;
5559 }
5560 if (opt_ndb_extra_logging)
5561 ndb_report_waiting(type_str, max_timeout,
5562 type_str, share->key, 0);
5563 }
5564 }
5565 pthread_mutex_unlock(&share->mutex);
5566 #else
5567 pthread_mutex_lock(&share->mutex);
5568 share->op= 0;
5569 pthread_mutex_unlock(&share->mutex);
5570 #endif
5571 thd->proc_info= save_proc_info;
5572
5573 DBUG_RETURN(0);
5574 }
5575
5576
5577 /********************************************************************
5578 Internal helper functions for differentd events from the stoarage nodes
5579 used by the ndb injector thread
5580 ********************************************************************/
5581
5582 /*
5583 Unpack a record read from NDB
5584
5585 SYNOPSIS
5586 ndb_unpack_record()
5587 buf Buffer to store read row
5588
5589 NOTE
5590 The data for each row is read directly into the
5591 destination buffer. This function is primarily
5592 called in order to check if any fields should be
5593 set to null.
5594 */
5595
ndb_unpack_record(TABLE * table,NdbValue * value,MY_BITMAP * defined,uchar * buf)5596 static void ndb_unpack_record(TABLE *table, NdbValue *value,
5597 MY_BITMAP *defined, uchar *buf)
5598 {
5599 Field **p_field= table->field, *field= *p_field;
5600 my_ptrdiff_t row_offset= (my_ptrdiff_t) (buf - table->record[0]);
5601 my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->write_set);
5602 DBUG_ENTER("ndb_unpack_record");
5603
5604 /*
5605 Set the filler bits of the null byte, since they are
5606 not touched in the code below.
5607
5608 The filler bits are the MSBs in the last null byte
5609 */
5610 if (table->s->null_bytes > 0)
5611 buf[table->s->null_bytes - 1]|= 256U - (1U <<
5612 table->s->last_null_bit_pos);
5613 /*
5614 Set null flag(s)
5615 */
5616 for ( ; field;
5617 p_field++, value++, field= *p_field)
5618 {
5619 field->set_notnull(row_offset);
5620 if ((*value).ptr)
5621 {
5622 if (!(field->flags & BLOB_FLAG))
5623 {
5624 int is_null= (*value).rec->isNULL();
5625 if (is_null)
5626 {
5627 if (is_null > 0)
5628 {
5629 DBUG_PRINT("info",("[%u] NULL", field->field_index));
5630 field->set_null(row_offset);
5631 }
5632 else
5633 {
5634 DBUG_PRINT("info",("[%u] UNDEFINED", field->field_index));
5635 bitmap_clear_bit(defined, field->field_index);
5636 }
5637 }
5638 else if (field->type() == MYSQL_TYPE_BIT)
5639 {
5640 Field_bit *field_bit= static_cast<Field_bit*>(field);
5641
5642 /*
5643 Move internal field pointer to point to 'buf'. Calling
5644 the correct member function directly since we know the
5645 type of the object.
5646 */
5647 field_bit->Field_bit::move_field_offset(row_offset);
5648 if (field->pack_length() < 5)
5649 {
5650 DBUG_PRINT("info", ("bit field H'%.8X",
5651 (*value).rec->u_32_value()));
5652 field_bit->Field_bit::store((longlong) (*value).rec->u_32_value(),
5653 TRUE);
5654 }
5655 else
5656 {
5657 DBUG_PRINT("info", ("bit field H'%.8X%.8X",
5658 *(Uint32 *)(*value).rec->aRef(),
5659 *((Uint32 *)(*value).rec->aRef()+1)));
5660 #ifdef WORDS_BIGENDIAN
5661 /* lsw is stored first */
5662 Uint32 *buf= (Uint32 *)(*value).rec->aRef();
5663 field_bit->Field_bit::store((((longlong)*buf)
5664 & 0x00000000FFFFFFFFLL)
5665 |
5666 ((((longlong)*(buf+1)) << 32)
5667 & 0xFFFFFFFF00000000LL),
5668 TRUE);
5669 #else
5670 field_bit->Field_bit::store((longlong)
5671 (*value).rec->u_64_value(), TRUE);
5672 #endif
5673 }
5674 /*
5675 Move back internal field pointer to point to original
5676 value (usually record[0]).
5677 */
5678 field_bit->Field_bit::move_field_offset(-row_offset);
5679 DBUG_PRINT("info",("[%u] SET",
5680 (*value).rec->getColumn()->getColumnNo()));
5681 DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5682 }
5683 else
5684 {
5685 DBUG_PRINT("info",("[%u] SET",
5686 (*value).rec->getColumn()->getColumnNo()));
5687 DBUG_DUMP("info", (const uchar*) field->ptr, field->pack_length());
5688 }
5689 }
5690 else
5691 {
5692 NdbBlob *ndb_blob= (*value).blob;
5693 uint col_no= field->field_index;
5694 int isNull;
5695 ndb_blob->getDefined(isNull);
5696 if (isNull == 1)
5697 {
5698 DBUG_PRINT("info",("[%u] NULL", col_no));
5699 field->set_null(row_offset);
5700 }
5701 else if (isNull == -1)
5702 {
5703 DBUG_PRINT("info",("[%u] UNDEFINED", col_no));
5704 bitmap_clear_bit(defined, col_no);
5705 }
5706 else
5707 {
5708 #ifndef DBUG_OFF
5709 // pointer vas set in get_ndb_blobs_value
5710 Field_blob *field_blob= (Field_blob*)field;
5711 uchar* ptr;
5712 field_blob->get_ptr(&ptr, row_offset);
5713 uint32 len= field_blob->get_length(row_offset);
5714 DBUG_PRINT("info",("[%u] SET ptr: 0x%lx len: %u",
5715 col_no, (long) ptr, len));
5716 #endif
5717 }
5718 }
5719 }
5720 }
5721 dbug_tmp_restore_column_map(table->write_set, old_map);
5722 DBUG_VOID_RETURN;
5723 }
5724
5725 /*
5726 Handle error states on events from the storage nodes
5727 */
5728 static int
ndb_binlog_thread_handle_error(Ndb * ndb,NdbEventOperation * pOp)5729 ndb_binlog_thread_handle_error(Ndb *ndb,
5730 NdbEventOperation *pOp)
5731 {
5732 Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5733 NDB_SHARE *share= event_data->share;
5734 DBUG_ENTER("ndb_binlog_thread_handle_error");
5735
5736 int overrun= pOp->isOverrun();
5737 if (overrun)
5738 {
5739 /*
5740 ToDo: this error should rather clear the ndb_binlog_index...
5741 and continue
5742 */
5743 sql_print_error("NDB Binlog: Overrun in event buffer, "
5744 "this means we have dropped events. Cannot "
5745 "continue binlog for %s", share->key);
5746 pOp->clearError();
5747 DBUG_RETURN(-1);
5748 }
5749
5750 if (!pOp->isConsistent())
5751 {
5752 /*
5753 ToDo: this error should rather clear the ndb_binlog_index...
5754 and continue
5755 */
5756 sql_print_error("NDB Binlog: Not Consistent. Cannot "
5757 "continue binlog for %s. Error code: %d"
5758 " Message: %s", share->key,
5759 pOp->getNdbError().code,
5760 pOp->getNdbError().message);
5761 pOp->clearError();
5762 DBUG_RETURN(-1);
5763 }
5764 sql_print_error("NDB Binlog: unhandled error %d for table %s",
5765 pOp->hasError(), share->key);
5766 pOp->clearError();
5767 DBUG_RETURN(0);
5768 }
5769
5770 static int
ndb_binlog_thread_handle_non_data_event(THD * thd,NdbEventOperation * pOp,ndb_binlog_index_row & row)5771 ndb_binlog_thread_handle_non_data_event(THD *thd,
5772 NdbEventOperation *pOp,
5773 ndb_binlog_index_row &row)
5774 {
5775 Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5776 NDB_SHARE *share= event_data->share;
5777 NDBEVENT::TableEvent type= pOp->getEventType();
5778
5779 switch (type)
5780 {
5781 case NDBEVENT::TE_CLUSTER_FAILURE:
5782 if (opt_ndb_extra_logging)
5783 sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
5784 share->key,
5785 (uint)(pOp->getGCI() >> 32),
5786 (uint)(pOp->getGCI()));
5787 if (ndb_apply_status_share == share)
5788 {
5789 if (opt_ndb_extra_logging &&
5790 ndb_binlog_tables_inited && ndb_binlog_running)
5791 sql_print_information("NDB Binlog: ndb tables initially "
5792 "read only on reconnect.");
5793 /* ndb_share reference binlog extra free */
5794 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
5795 share->key, share->use_count));
5796 free_share(&ndb_apply_status_share);
5797 ndb_apply_status_share= 0;
5798 ndb_binlog_tables_inited= FALSE;
5799 }
5800 DBUG_PRINT("error", ("CLUSTER FAILURE EVENT: "
5801 "%s received share: 0x%lx op: 0x%lx share op: 0x%lx "
5802 "new_op: 0x%lx",
5803 share->key, (long) share, (long) pOp,
5804 (long) share->op, (long) share->new_op));
5805 break;
5806 case NDBEVENT::TE_DROP:
5807 if (ndb_apply_status_share == share)
5808 {
5809 if (opt_ndb_extra_logging &&
5810 ndb_binlog_tables_inited && ndb_binlog_running)
5811 sql_print_information("NDB Binlog: ndb tables initially "
5812 "read only on reconnect.");
5813 /* ndb_share reference binlog extra free */
5814 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
5815 share->key, share->use_count));
5816 free_share(&ndb_apply_status_share);
5817 ndb_apply_status_share= 0;
5818 ndb_binlog_tables_inited= FALSE;
5819 }
5820 /* ToDo: remove printout */
5821 if (opt_ndb_extra_logging)
5822 sql_print_information("NDB Binlog: drop table %s.", share->key);
5823 // fall through
5824 case NDBEVENT::TE_ALTER:
5825 row.n_schemaops++;
5826 DBUG_PRINT("info", ("TABLE %s EVENT: %s received share: 0x%lx op: 0x%lx "
5827 "share op: 0x%lx new_op: 0x%lx",
5828 type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
5829 share->key, (long) share, (long) pOp,
5830 (long) share->op, (long) share->new_op));
5831 break;
5832 case NDBEVENT::TE_NODE_FAILURE:
5833 /* fall through */
5834 case NDBEVENT::TE_SUBSCRIBE:
5835 /* fall through */
5836 case NDBEVENT::TE_UNSUBSCRIBE:
5837 /* ignore */
5838 return 0;
5839 default:
5840 sql_print_error("NDB Binlog: unknown non data event %d for %s. "
5841 "Ignoring...", (unsigned) type, share->key);
5842 return 0;
5843 }
5844
5845 ndb_handle_schema_change(thd, injector_ndb, pOp, event_data);
5846 return 0;
5847 }
5848
5849 /*
5850 Handle data events from the storage nodes
5851 */
5852 inline ndb_binlog_index_row *
ndb_find_binlog_index_row(ndb_binlog_index_row ** rows,uint orig_server_id,int flag)5853 ndb_find_binlog_index_row(ndb_binlog_index_row **rows,
5854 uint orig_server_id, int flag)
5855 {
5856 ndb_binlog_index_row *row= *rows;
5857 if (opt_ndb_log_orig)
5858 {
5859 ndb_binlog_index_row *first= row, *found_id= 0;
5860 for (;;)
5861 {
5862 if (row->orig_server_id == orig_server_id)
5863 {
5864 /* */
5865 if (!flag || !row->orig_epoch)
5866 return row;
5867 if (!found_id)
5868 found_id= row;
5869 }
5870 if (row->orig_server_id == 0)
5871 break;
5872 row= row->next;
5873 if (row == NULL)
5874 {
5875 row= (ndb_binlog_index_row*)sql_alloc(sizeof(ndb_binlog_index_row));
5876 memset(row, 0, sizeof(ndb_binlog_index_row));
5877 row->next= first;
5878 *rows= row;
5879 if (found_id)
5880 {
5881 /*
5882 If we found index_row with same server id already
5883 that row will contain the current stats.
5884 Copy stats over to new and reset old.
5885 */
5886 row->n_inserts= found_id->n_inserts;
5887 row->n_updates= found_id->n_updates;
5888 row->n_deletes= found_id->n_deletes;
5889 found_id->n_inserts= 0;
5890 found_id->n_updates= 0;
5891 found_id->n_deletes= 0;
5892 }
5893 /* keep track of schema ops only on "first" index_row */
5894 row->n_schemaops= first->n_schemaops;
5895 first->n_schemaops= 0;
5896 break;
5897 }
5898 }
5899 row->orig_server_id= orig_server_id;
5900 }
5901 return row;
5902 }
5903
5904 static int
ndb_binlog_thread_handle_data_event(Ndb * ndb,NdbEventOperation * pOp,ndb_binlog_index_row ** rows,injector::transaction & trans,unsigned & trans_row_count,unsigned & trans_slave_row_count)5905 ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
5906 ndb_binlog_index_row **rows,
5907 injector::transaction &trans,
5908 unsigned &trans_row_count,
5909 unsigned &trans_slave_row_count)
5910 {
5911 Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
5912 TABLE *table= event_data->shadow_table;
5913 NDB_SHARE *share= event_data->share;
5914 if (pOp != share->op)
5915 {
5916 return 0;
5917 }
5918
5919 uint32 anyValue= pOp->getAnyValue();
5920 if (ndbcluster_anyvalue_is_reserved(anyValue))
5921 {
5922 if (!ndbcluster_anyvalue_is_nologging(anyValue))
5923 sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
5924 "event not logged",
5925 anyValue);
5926 return 0;
5927 }
5928 uint32 originating_server_id= ndbcluster_anyvalue_get_serverid(anyValue);
5929 bool log_this_slave_update = g_ndb_log_slave_updates;
5930 bool count_this_event = true;
5931
5932 if (share == ndb_apply_status_share)
5933 {
5934 /*
5935 Note that option values are read without synchronisation w.r.t.
5936 thread setting option variable or epoch boundaries.
5937 */
5938 if (opt_ndb_log_apply_status ||
5939 opt_ndb_log_orig)
5940 {
5941 Uint32 ndb_apply_status_logging_server_id= originating_server_id;
5942 Uint32 ndb_apply_status_server_id= 0;
5943 Uint64 ndb_apply_status_epoch= 0;
5944 bool event_has_data = false;
5945
5946 switch(pOp->getEventType())
5947 {
5948 case NDBEVENT::TE_INSERT:
5949 // fall through
5950 case NDBEVENT::TE_UPDATE:
5951 event_has_data = true;
5952 break;
5953 case NDBEVENT::TE_DELETE:
5954 break;
5955 default:
5956 /* We should REALLY never get here */
5957 abort();
5958 }
5959
5960 if (likely( event_has_data ))
5961 {
5962 /* unpack data to fetch orig_server_id and orig_epoch */
5963 uint n_fields= table->s->fields;
5964 MY_BITMAP b;
5965 uint32 bitbuf[128 / (sizeof(uint32) * 8)];
5966 bitmap_init(&b, bitbuf, n_fields, FALSE);
5967 bitmap_set_all(&b);
5968 ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
5969 ndb_apply_status_server_id= (uint)((Field_long *)table->field[0])->val_int();
5970 ndb_apply_status_epoch= ((Field_longlong *)table->field[1])->val_int();
5971
5972 if (opt_ndb_log_apply_status)
5973 {
5974 /*
5975 Determine if event came from our immediate Master server
5976 Ignore locally manually sourced and reserved events
5977 */
5978 if ((ndb_apply_status_logging_server_id != 0) &&
5979 (! ndbcluster_anyvalue_is_reserved(ndb_apply_status_logging_server_id)))
5980 {
5981 bool isFromImmediateMaster = (ndb_apply_status_server_id ==
5982 ndb_apply_status_logging_server_id);
5983
5984 if (isFromImmediateMaster)
5985 {
5986 /*
5987 We log this event with our server-id so that it
5988 propagates back to the originating Master (our
5989 immediate Master)
5990 */
5991 assert(ndb_apply_status_logging_server_id != ::server_id);
5992
5993 originating_server_id= 0; /* Will be set to our ::serverid below */
5994 }
5995 }
5996 }
5997
5998 if (opt_ndb_log_orig)
5999 {
6000 /* store */
6001 ndb_binlog_index_row *row= ndb_find_binlog_index_row
6002 (rows, ndb_apply_status_server_id, 1);
6003 row->orig_epoch= ndb_apply_status_epoch;
6004 }
6005 }
6006 } // opt_ndb_log_apply_status || opt_ndb_log_orig)
6007
6008 if (opt_ndb_log_apply_status)
6009 {
6010 /* We are logging ndb_apply_status changes
6011 * Don't count this event as making an epoch non-empty
6012 * Log this event in the Binlog
6013 */
6014 count_this_event = false;
6015 log_this_slave_update = true;
6016 }
6017 else
6018 {
6019 /* Not logging ndb_apply_status updates, discard this event now */
6020 return 0;
6021 }
6022 }
6023
6024 if (originating_server_id == 0)
6025 originating_server_id= ::server_id;
6026 else
6027 {
6028 /* Track that we received a replicated row event */
6029 if (likely( count_this_event ))
6030 trans_slave_row_count++;
6031
6032 if (!log_this_slave_update)
6033 {
6034 /*
6035 This event comes from a slave applier since it has an originating
6036 server id set. Since option to log slave updates is not set, skip it.
6037 */
6038 return 0;
6039 }
6040 }
6041
6042 /*
6043 Start with logged_server_id as AnyValue in case it's a composite
6044 (server_id_bits < 31). This way any user-values are passed-through
6045 to the Binlog in the high bits of the event's Server Id.
6046 In future it may be useful to support *not* mapping composite
6047 AnyValues to/from Binlogged server-ids.
6048 */
6049 uint32 logged_server_id= anyValue;
6050 ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
6051
6052 DBUG_ASSERT(trans.good());
6053 DBUG_ASSERT(table != 0);
6054
6055 dbug_print_table("table", table);
6056
6057 uint n_fields= table->s->fields;
6058 DBUG_PRINT("info", ("Assuming %u columns for table %s",
6059 n_fields, table->s->table_name.str));
6060 MY_BITMAP b;
6061 /* Potential buffer for the bitmap */
6062 uint32 bitbuf[128 / (sizeof(uint32) * 8)];
6063 const bool own_buffer = n_fields <= sizeof(bitbuf) * 8;
6064 bitmap_init(&b, own_buffer ? bitbuf : NULL, n_fields, FALSE);
6065 bitmap_set_all(&b);
6066
6067 /*
6068 row data is already in table->record[0]
6069 As we told the NdbEventOperation to do this
6070 (saves moving data about many times)
6071 */
6072
6073 /*
6074 for now malloc/free blobs buffer each time
6075 TODO if possible share single permanent buffer with handlers
6076 */
6077 uchar* blobs_buffer[2] = { 0, 0 };
6078 uint blobs_buffer_size[2] = { 0, 0 };
6079
6080 ndb_binlog_index_row *row=
6081 ndb_find_binlog_index_row(rows, originating_server_id, 0);
6082
6083 switch(pOp->getEventType())
6084 {
6085 case NDBEVENT::TE_INSERT:
6086 if (likely( count_this_event ))
6087 {
6088 row->n_inserts++;
6089 trans_row_count++;
6090 }
6091 DBUG_PRINT("info", ("INSERT INTO %s.%s",
6092 table->s->db.str, table->s->table_name.str));
6093 {
6094 int ret;
6095 if (share->flags & NSF_BLOB_FLAG)
6096 {
6097 my_ptrdiff_t ptrdiff= 0;
6098 ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
6099 blobs_buffer[0],
6100 blobs_buffer_size[0],
6101 ptrdiff);
6102 assert(ret == 0);
6103 }
6104 ndb_unpack_record(table, event_data->ndb_value[0], &b, table->record[0]);
6105 ret = trans.write_row(logged_server_id,
6106 injector::transaction::table(table, true),
6107 &b, n_fields, table->record[0]);
6108 assert(ret == 0);
6109 }
6110 break;
6111 case NDBEVENT::TE_DELETE:
6112 if (likely( count_this_event ))
6113 {
6114 row->n_deletes++;
6115 trans_row_count++;
6116 }
6117 DBUG_PRINT("info",("DELETE FROM %s.%s",
6118 table->s->db.str, table->s->table_name.str));
6119 {
6120 /*
6121 table->record[0] contains only the primary key in this case
6122 since we do not have an after image
6123 */
6124 int n;
6125 if (!get_binlog_full(share) && table->s->primary_key != MAX_KEY)
6126 n= 0; /*
6127 use the primary key only as it save time and space and
6128 it is the only thing needed to log the delete
6129 */
6130 else
6131 n= 1; /*
6132 we use the before values since we don't have a primary key
6133 since the mysql server does not handle the hidden primary
6134 key
6135 */
6136
6137 int ret;
6138 if (share->flags & NSF_BLOB_FLAG)
6139 {
6140 my_ptrdiff_t ptrdiff= table->record[n] - table->record[0];
6141 ret = get_ndb_blobs_value(table, event_data->ndb_value[n],
6142 blobs_buffer[n],
6143 blobs_buffer_size[n],
6144 ptrdiff);
6145 assert(ret == 0);
6146 }
6147 ndb_unpack_record(table, event_data->ndb_value[n], &b, table->record[n]);
6148 DBUG_EXECUTE("info", print_records(table, table->record[n]););
6149 ret = trans.delete_row(logged_server_id,
6150 injector::transaction::table(table, true),
6151 &b, n_fields, table->record[n]);
6152 assert(ret == 0);
6153 }
6154 break;
6155 case NDBEVENT::TE_UPDATE:
6156 if (likely( count_this_event ))
6157 {
6158 row->n_updates++;
6159 trans_row_count++;
6160 }
6161 DBUG_PRINT("info", ("UPDATE %s.%s",
6162 table->s->db.str, table->s->table_name.str));
6163 {
6164 int ret;
6165 if (share->flags & NSF_BLOB_FLAG)
6166 {
6167 my_ptrdiff_t ptrdiff= 0;
6168 ret = get_ndb_blobs_value(table, event_data->ndb_value[0],
6169 blobs_buffer[0],
6170 blobs_buffer_size[0],
6171 ptrdiff);
6172 assert(ret == 0);
6173 }
6174 ndb_unpack_record(table, event_data->ndb_value[0],
6175 &b, table->record[0]);
6176 DBUG_EXECUTE("info", print_records(table, table->record[0]););
6177 if (table->s->primary_key != MAX_KEY &&
6178 !get_binlog_use_update(share))
6179 {
6180 /*
6181 since table has a primary key, we can do a write
6182 using only after values
6183 */
6184 ret = trans.write_row(logged_server_id,
6185 injector::transaction::table(table, true),
6186 &b, n_fields, table->record[0]);// after values
6187 assert(ret == 0);
6188 }
6189 else
6190 {
6191 /*
6192 mysql server cannot handle the ndb hidden key and
6193 therefore needs the before image as well
6194 */
6195 if (share->flags & NSF_BLOB_FLAG)
6196 {
6197 my_ptrdiff_t ptrdiff= table->record[1] - table->record[0];
6198 ret = get_ndb_blobs_value(table, event_data->ndb_value[1],
6199 blobs_buffer[1],
6200 blobs_buffer_size[1],
6201 ptrdiff);
6202 assert(ret == 0);
6203 }
6204 ndb_unpack_record(table, event_data->ndb_value[1], &b, table->record[1]);
6205 DBUG_EXECUTE("info", print_records(table, table->record[1]););
6206 ret = trans.update_row(logged_server_id,
6207 injector::transaction::table(table, true),
6208 &b, n_fields,
6209 table->record[1], // before values
6210 table->record[0]);// after values
6211 assert(ret == 0);
6212 }
6213 }
6214 break;
6215 default:
6216 /* We should REALLY never get here. */
6217 DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
6218 break;
6219 }
6220
6221 if (share->flags & NSF_BLOB_FLAG)
6222 {
6223 my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
6224 my_free(blobs_buffer[1], MYF(MY_ALLOW_ZERO_PTR));
6225 }
6226
6227 if (!own_buffer)
6228 {
6229 bitmap_free(&b);
6230 }
6231
6232 return 0;
6233 }
6234
6235 //#define RUN_NDB_BINLOG_TIMER
6236 #ifdef RUN_NDB_BINLOG_TIMER
6237 class Timer
6238 {
6239 public:
Timer()6240 Timer() { start(); }
start()6241 void start() { gettimeofday(&m_start, 0); }
stop()6242 void stop() { gettimeofday(&m_stop, 0); }
elapsed_ms()6243 ulong elapsed_ms()
6244 {
6245 return (ulong)
6246 (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
6247 ((longlong) m_stop.tv_usec -
6248 (longlong) m_start.tv_usec + 999) / 1000);
6249 }
6250 private:
6251 struct timeval m_start,m_stop;
6252 };
6253 #endif
6254
6255 /****************************************************************
6256 Injector thread main loop
6257 ****************************************************************/
6258
6259 static uchar *
ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT * schema_object,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))6260 ndb_schema_objects_get_key(NDB_SCHEMA_OBJECT *schema_object,
6261 size_t *length,
6262 my_bool not_used MY_ATTRIBUTE((unused)))
6263 {
6264 *length= schema_object->key_length;
6265 return (uchar*) schema_object->key;
6266 }
6267
ndb_get_schema_object(const char * key,my_bool create_if_not_exists,my_bool have_lock)6268 static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
6269 my_bool create_if_not_exists,
6270 my_bool have_lock)
6271 {
6272 NDB_SCHEMA_OBJECT *ndb_schema_object;
6273 uint length= (uint) strlen(key);
6274 DBUG_ENTER("ndb_get_schema_object");
6275 DBUG_PRINT("enter", ("key: '%s'", key));
6276
6277 if (!have_lock)
6278 pthread_mutex_lock(&ndbcluster_mutex);
6279 while (!(ndb_schema_object=
6280 (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
6281 (const uchar*) key,
6282 length)))
6283 {
6284 if (!create_if_not_exists)
6285 {
6286 DBUG_PRINT("info", ("does not exist"));
6287 break;
6288 }
6289 if (!(ndb_schema_object=
6290 (NDB_SCHEMA_OBJECT*) my_malloc(sizeof(*ndb_schema_object) + length + 1,
6291 MYF(MY_WME | MY_ZEROFILL))))
6292 {
6293 DBUG_PRINT("info", ("malloc error"));
6294 break;
6295 }
6296 ndb_schema_object->key= (char *)(ndb_schema_object+1);
6297 memcpy(ndb_schema_object->key, key, length + 1);
6298 ndb_schema_object->key_length= length;
6299 if (my_hash_insert(&ndb_schema_objects, (uchar*) ndb_schema_object))
6300 {
6301 my_free((uchar*) ndb_schema_object, 0);
6302 break;
6303 }
6304 pthread_mutex_init(&ndb_schema_object->mutex, MY_MUTEX_INIT_FAST);
6305 bitmap_init(&ndb_schema_object->slock_bitmap, ndb_schema_object->slock,
6306 sizeof(ndb_schema_object->slock)*8, FALSE);
6307 bitmap_clear_all(&ndb_schema_object->slock_bitmap);
6308 break;
6309 }
6310 if (ndb_schema_object)
6311 {
6312 ndb_schema_object->use_count++;
6313 DBUG_PRINT("info", ("use_count: %d", ndb_schema_object->use_count));
6314 }
6315 if (!have_lock)
6316 pthread_mutex_unlock(&ndbcluster_mutex);
6317 DBUG_RETURN(ndb_schema_object);
6318 }
6319
6320
ndb_free_schema_object(NDB_SCHEMA_OBJECT ** ndb_schema_object,bool have_lock)6321 static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
6322 bool have_lock)
6323 {
6324 DBUG_ENTER("ndb_free_schema_object");
6325 DBUG_PRINT("enter", ("key: '%s'", (*ndb_schema_object)->key));
6326 if (!have_lock)
6327 pthread_mutex_lock(&ndbcluster_mutex);
6328 if (!--(*ndb_schema_object)->use_count)
6329 {
6330 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
6331 my_hash_delete(&ndb_schema_objects, (uchar*) *ndb_schema_object);
6332 pthread_mutex_destroy(&(*ndb_schema_object)->mutex);
6333 my_free((uchar*) *ndb_schema_object, MYF(0));
6334 *ndb_schema_object= 0;
6335 }
6336 else
6337 {
6338 DBUG_PRINT("info", ("use_count: %d", (*ndb_schema_object)->use_count));
6339 }
6340 if (!have_lock)
6341 pthread_mutex_unlock(&ndbcluster_mutex);
6342 DBUG_VOID_RETURN;
6343 }
6344
6345
6346 static void
remove_event_operations(Ndb * ndb)6347 remove_event_operations(Ndb* ndb)
6348 {
6349 DBUG_ENTER("remove_event_operations");
6350 NdbEventOperation *op;
6351 while ((op= ndb->getEventOperation()))
6352 {
6353 DBUG_ASSERT(!IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
6354 DBUG_PRINT("info", ("removing event operation on %s",
6355 op->getEvent()->getName()));
6356
6357 Ndb_event_data *event_data= (Ndb_event_data *) op->getCustomData();
6358 DBUG_ASSERT(event_data);
6359
6360 NDB_SHARE *share= event_data->share;
6361 DBUG_ASSERT(share != NULL);
6362 DBUG_ASSERT(share->op == op || share->new_op == op);
6363
6364 delete event_data;
6365 op->setCustomData(NULL);
6366
6367 pthread_mutex_lock(&share->mutex);
6368 share->op= 0;
6369 share->new_op= 0;
6370 pthread_mutex_unlock(&share->mutex);
6371
6372 DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u",
6373 share->key, share->use_count));
6374 free_share(&share);
6375
6376 ndb->dropEventOperation(op);
6377 }
6378 DBUG_VOID_RETURN;
6379 }
6380
6381 extern long long g_event_data_count;
6382 extern long long g_event_nondata_count;
6383 extern long long g_event_bytes_count;
6384
updateInjectorStats(Ndb * schemaNdb,Ndb * dataNdb)6385 void updateInjectorStats(Ndb* schemaNdb, Ndb* dataNdb)
6386 {
6387 /* Update globals to sum of totals from each listening
6388 * Ndb object
6389 */
6390 g_event_data_count =
6391 schemaNdb->getClientStat(Ndb::DataEventsRecvdCount) +
6392 dataNdb->getClientStat(Ndb::DataEventsRecvdCount);
6393 g_event_nondata_count =
6394 schemaNdb->getClientStat(Ndb::NonDataEventsRecvdCount) +
6395 dataNdb->getClientStat(Ndb::NonDataEventsRecvdCount);
6396 g_event_bytes_count =
6397 schemaNdb->getClientStat(Ndb::EventBytesRecvdCount) +
6398 dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
6399 }
6400
6401 enum Binlog_thread_state
6402 {
6403 BCCC_running= 0,
6404 BCCC_exit= 1,
6405 BCCC_restart= 2
6406 };
6407
6408 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
6409 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
6410
6411 pthread_handler_t
ndb_binlog_thread_func(void * arg)6412 ndb_binlog_thread_func(void *arg)
6413 {
6414 THD *thd; /* needs to be first for thread_stack */
6415 Ndb *i_ndb= 0;
6416 Ndb *s_ndb= 0;
6417 Thd_ndb *thd_ndb=0;
6418 injector *inj= injector::instance();
6419 uint incident_id= 0;
6420 Binlog_thread_state do_ndbcluster_binlog_close_connection;
6421
6422 /**
6423 * If we get error after having reported incident
6424 * but before binlog started...we do "Restarting Cluster Binlog"
6425 * in that case, don't report incident again
6426 */
6427 bool do_incident = true;
6428
6429 #ifdef RUN_NDB_BINLOG_TIMER
6430 Timer main_timer;
6431 #endif
6432
6433 pthread_mutex_lock(&injector_mutex);
6434 /*
6435 Set up the Thread
6436 */
6437 my_thread_init();
6438 DBUG_ENTER("ndb_binlog_thread");
6439
6440 thd= new THD; /* note that contructor of THD uses DBUG_ */
6441 THD_CHECK_SENTRY(thd);
6442
6443 /* We need to set thd->thread_id before thd->store_globals, or it will
6444 set an invalid value for thd->variables.pseudo_thread_id.
6445 */
6446 mysql_mutex_lock(&LOCK_thread_count);
6447 thd->thread_id= thread_id++;
6448 mysql_mutex_unlock(&LOCK_thread_count);
6449
6450 thd->thread_stack= (char*) &thd; /* remember where our stack is */
6451 if (thd->store_globals())
6452 {
6453 thd->cleanup();
6454 delete thd;
6455 ndb_binlog_thread_running= -1;
6456 pthread_mutex_unlock(&injector_mutex);
6457 pthread_cond_signal(&injector_cond);
6458
6459 DBUG_LEAVE; // Must match DBUG_ENTER()
6460 my_thread_end();
6461 pthread_exit(0);
6462 return NULL; // Avoid compiler warnings
6463 }
6464 lex_start(thd);
6465
6466 thd->init_for_queries();
6467 thd_set_command(thd, COM_DAEMON);
6468 thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
6469 #ifndef NDB_THD_HAS_NO_VERSION
6470 thd->version= refresh_version;
6471 #endif
6472 thd->client_capabilities= 0;
6473 thd->security_ctx->skip_grants();
6474 my_net_init(&thd->net, 0);
6475
6476 // Ndb binlog thread always use row format
6477 thd->set_current_stmt_binlog_format_row();
6478
6479 /*
6480 Set up ndb binlog
6481 */
6482 sql_print_information("Starting Cluster Binlog Thread");
6483
6484 pthread_detach_this_thread();
6485 thd->real_id= pthread_self();
6486 mysql_mutex_lock(&LOCK_thread_count);
6487 add_global_thread(thd);
6488 mysql_mutex_unlock(&LOCK_thread_count);
6489 thd->lex->start_transaction_opt= 0;
6490
6491
6492 restart_cluster_failure:
6493 int have_injector_mutex_lock= 0;
6494 do_ndbcluster_binlog_close_connection= BCCC_exit;
6495
6496 if (!(thd_ndb= Thd_ndb::seize(thd)))
6497 {
6498 sql_print_error("Could not allocate Thd_ndb object");
6499 ndb_binlog_thread_running= -1;
6500 pthread_mutex_unlock(&injector_mutex);
6501 pthread_cond_signal(&injector_cond);
6502 goto err;
6503 }
6504
6505 if (!(s_ndb= new Ndb(g_ndb_cluster_connection, NDB_REP_DB)) ||
6506 s_ndb->init())
6507 {
6508 sql_print_error("NDB Binlog: Getting Schema Ndb object failed");
6509 ndb_binlog_thread_running= -1;
6510 pthread_mutex_unlock(&injector_mutex);
6511 pthread_cond_signal(&injector_cond);
6512 goto err;
6513 }
6514
6515 // empty database
6516 if (!(i_ndb= new Ndb(g_ndb_cluster_connection, "")) ||
6517 i_ndb->init())
6518 {
6519 sql_print_error("NDB Binlog: Getting Ndb object failed");
6520 ndb_binlog_thread_running= -1;
6521 pthread_mutex_unlock(&injector_mutex);
6522 pthread_cond_signal(&injector_cond);
6523 goto err;
6524 }
6525
6526 /* init hash for schema object distribution */
6527 (void) my_hash_init(&ndb_schema_objects, system_charset_info, 32, 0, 0,
6528 (my_hash_get_key)ndb_schema_objects_get_key, 0, 0);
6529
6530 /*
6531 Expose global reference to our ndb object.
6532
6533 Used by both sql client thread and binlog thread to interact
6534 with the storage
6535 pthread_mutex_lock(&injector_mutex);
6536 */
6537 injector_thd= thd;
6538 injector_ndb= i_ndb;
6539 schema_ndb= s_ndb;
6540
6541 if (opt_bin_log && opt_ndb_log_bin)
6542 {
6543 ndb_binlog_running= TRUE;
6544 }
6545
6546 /* Thread start up completed */
6547 ndb_binlog_thread_running= 1;
6548 pthread_mutex_unlock(&injector_mutex);
6549 pthread_cond_signal(&injector_cond);
6550
6551 /*
6552 wait for mysql server to start (so that the binlog is started
6553 and thus can receive the first GAP event)
6554 */
6555 mysql_mutex_lock(&LOCK_server_started);
6556 while (!mysqld_server_started)
6557 {
6558 struct timespec abstime;
6559 set_timespec(abstime, 1);
6560 mysql_cond_timedwait(&COND_server_started, &LOCK_server_started,
6561 &abstime);
6562 if (ndbcluster_terminating)
6563 {
6564 mysql_mutex_unlock(&LOCK_server_started);
6565 goto err;
6566 }
6567 }
6568 mysql_mutex_unlock(&LOCK_server_started);
6569 /*
6570 Main NDB Injector loop
6571 */
6572 while (do_incident && ndb_binlog_running)
6573 {
6574 /*
6575 check if it is the first log, if so we do not insert a GAP event
6576 as there is really no log to have a GAP in
6577 */
6578 if (incident_id == 0)
6579 {
6580 LOG_INFO log_info;
6581 mysql_bin_log.get_current_log(&log_info);
6582 int len= strlen(log_info.log_file_name);
6583 uint no= 0;
6584 if ((sscanf(log_info.log_file_name + len - 6, "%u", &no) == 1) &&
6585 no == 1)
6586 {
6587 /* this is the fist log, so skip GAP event */
6588 break;
6589 }
6590 }
6591
6592 /*
6593 Always insert a GAP event as we cannot know what has happened
6594 in the cluster while not being connected.
6595 */
6596 LEX_STRING const msg[2]=
6597 {
6598 { C_STRING_WITH_LEN("mysqld startup") },
6599 { C_STRING_WITH_LEN("cluster disconnect")}
6600 };
6601 int ret = inj->record_incident(thd, INCIDENT_LOST_EVENTS,
6602 msg[incident_id]);
6603 assert(ret == 0); NDB_IGNORE_VALUE(ret);
6604 do_incident = false; // Don't report incident again, unless we get started
6605 break;
6606 }
6607 incident_id= 1;
6608 {
6609 thd->proc_info= "Waiting for ndbcluster to start";
6610
6611 pthread_mutex_lock(&injector_mutex);
6612 while (!ndb_schema_share ||
6613 (ndb_binlog_running && !ndb_apply_status_share) ||
6614 !ndb_binlog_tables_inited)
6615 {
6616 if (!thd_ndb->valid_ndb())
6617 {
6618 /*
6619 Cluster has gone away before setup was completed.
6620 Keep lock on injector_mutex to prevent further
6621 usage of the injector_ndb, and restart binlog
6622 thread to get rid of any garbage on the ndb objects
6623 */
6624 have_injector_mutex_lock= 1;
6625 do_ndbcluster_binlog_close_connection= BCCC_restart;
6626 goto err;
6627 }
6628 /* ndb not connected yet */
6629 struct timespec abstime;
6630 set_timespec(abstime, 1);
6631 pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
6632 if (ndbcluster_binlog_terminating)
6633 {
6634 pthread_mutex_unlock(&injector_mutex);
6635 goto err;
6636 }
6637 }
6638 pthread_mutex_unlock(&injector_mutex);
6639
6640 DBUG_ASSERT(ndbcluster_hton->slot != ~(uint)0);
6641 thd_set_thd_ndb(thd, thd_ndb);
6642 thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
6643 thd->query_id= 0; // to keep valgrind quiet
6644 }
6645
6646 {
6647 // wait for the first event
6648 thd->proc_info= "Waiting for first event from ndbcluster";
6649 int schema_res, res;
6650 Uint64 schema_gci;
6651 do
6652 {
6653 DBUG_PRINT("info", ("Waiting for the first event"));
6654
6655 if (ndbcluster_binlog_terminating)
6656 goto err;
6657
6658 schema_res= s_ndb->pollEvents(100, &schema_gci);
6659 } while (schema_gci == 0 || ndb_latest_received_binlog_epoch == schema_gci);
6660 if (ndb_binlog_running)
6661 {
6662 Uint64 gci= i_ndb->getLatestGCI();
6663 while (gci < schema_gci || gci == ndb_latest_received_binlog_epoch)
6664 {
6665 if (ndbcluster_binlog_terminating)
6666 goto err;
6667 res= i_ndb->pollEvents(10, &gci);
6668 }
6669 if (gci > schema_gci)
6670 {
6671 schema_gci= gci;
6672 }
6673 }
6674 // now check that we have epochs consistant with what we had before the restart
6675 DBUG_PRINT("info", ("schema_res: %d schema_gci: %u/%u", schema_res,
6676 (uint)(schema_gci >> 32),
6677 (uint)(schema_gci)));
6678 {
6679 i_ndb->flushIncompleteEvents(schema_gci);
6680 s_ndb->flushIncompleteEvents(schema_gci);
6681 if (schema_gci < ndb_latest_handled_binlog_epoch)
6682 {
6683 sql_print_error("NDB Binlog: cluster has been restarted --initial or with older filesystem. "
6684 "ndb_latest_handled_binlog_epoch: %u/%u, while current epoch: %u/%u. "
6685 "RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch.",
6686 (uint)(ndb_latest_handled_binlog_epoch >> 32),
6687 (uint)(ndb_latest_handled_binlog_epoch),
6688 (uint)(schema_gci >> 32),
6689 (uint)(schema_gci));
6690 ndb_set_latest_trans_gci(0);
6691 ndb_latest_handled_binlog_epoch= 0;
6692 ndb_latest_applied_binlog_epoch= 0;
6693 ndb_latest_received_binlog_epoch= 0;
6694 }
6695 else if (ndb_latest_applied_binlog_epoch > 0)
6696 {
6697 sql_print_warning("NDB Binlog: cluster has reconnected. "
6698 "Changes to the database that occured while "
6699 "disconnected will not be in the binlog");
6700 }
6701 if (opt_ndb_extra_logging)
6702 {
6703 sql_print_information("NDB Binlog: starting log at epoch %u/%u",
6704 (uint)(schema_gci >> 32),
6705 (uint)(schema_gci));
6706 }
6707 }
6708 }
6709 /*
6710 binlog thread is ready to receive events
6711 - client threads may now start updating data, i.e. tables are
6712 no longer read only
6713 */
6714 ndb_binlog_is_ready= TRUE;
6715
6716 if (opt_ndb_extra_logging)
6717 sql_print_information("NDB Binlog: ndb tables writable");
6718 close_cached_tables((THD*) 0, (TABLE_LIST*) 0, FALSE, FALSE, FALSE);
6719
6720 /*
6721 Signal any waiting thread that ndb table setup is
6722 now complete
6723 */
6724 ndb_notify_tables_writable();
6725
6726 {
6727 static char db[]= "";
6728 thd->db= db;
6729 }
6730 do_incident = true; // If we get disconnected again...do incident report
6731 do_ndbcluster_binlog_close_connection= BCCC_running;
6732 for ( ; !((ndbcluster_binlog_terminating ||
6733 do_ndbcluster_binlog_close_connection) &&
6734 ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
6735 do_ndbcluster_binlog_close_connection != BCCC_restart; )
6736 {
6737 #ifndef DBUG_OFF
6738 if (do_ndbcluster_binlog_close_connection)
6739 {
6740 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
6741 "ndb_latest_handled_binlog_epoch: %u/%u, "
6742 "*get_latest_trans_gci(): %u/%u",
6743 do_ndbcluster_binlog_close_connection,
6744 (uint)(ndb_latest_handled_binlog_epoch >> 32),
6745 (uint)(ndb_latest_handled_binlog_epoch),
6746 (uint)(ndb_get_latest_trans_gci() >> 32),
6747 (uint)(ndb_get_latest_trans_gci())));
6748 }
6749 #endif
6750 #ifdef RUN_NDB_BINLOG_TIMER
6751 main_timer.stop();
6752 sql_print_information("main_timer %ld ms", main_timer.elapsed_ms());
6753 main_timer.start();
6754 #endif
6755
6756 /*
6757 now we don't want any events before next gci is complete
6758 */
6759 thd->proc_info= "Waiting for event from ndbcluster";
6760 thd->set_time();
6761
6762 /* wait for event or 1000 ms */
6763 Uint64 gci= 0, schema_gci;
6764 int res= 0, tot_poll_wait= 1000;
6765 if (ndb_binlog_running)
6766 {
6767 res= i_ndb->pollEvents(tot_poll_wait, &gci);
6768 tot_poll_wait= 0;
6769 }
6770 int schema_res= s_ndb->pollEvents(tot_poll_wait, &schema_gci);
6771 ndb_latest_received_binlog_epoch= gci;
6772
6773 while (gci > schema_gci && schema_res >= 0)
6774 {
6775 static char buf[64];
6776 thd->proc_info= "Waiting for schema epoch";
6777 my_snprintf(buf, sizeof(buf), "%s %u/%u(%u/%u)", thd->proc_info,
6778 (uint)(schema_gci >> 32),
6779 (uint)(schema_gci),
6780 (uint)(gci >> 32),
6781 (uint)(gci));
6782 thd->proc_info= buf;
6783 schema_res= s_ndb->pollEvents(10, &schema_gci);
6784 }
6785
6786 if ((ndbcluster_binlog_terminating ||
6787 do_ndbcluster_binlog_close_connection) &&
6788 (ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
6789 !ndb_binlog_running))
6790 break; /* Shutting down server */
6791
6792 MEM_ROOT **root_ptr=
6793 my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC);
6794 MEM_ROOT *old_root= *root_ptr;
6795 MEM_ROOT mem_root;
6796 init_sql_alloc(&mem_root, 4096, 0);
6797 List<Cluster_schema> post_epoch_log_list;
6798 List<Cluster_schema> post_epoch_unlock_list;
6799 *root_ptr= &mem_root;
6800
6801 if (unlikely(schema_res > 0))
6802 {
6803 thd->proc_info= "Processing events from schema table";
6804 g_ndb_log_slave_updates= opt_log_slave_updates;
6805 s_ndb->
6806 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6807 s_ndb->
6808 setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6809 NdbEventOperation *pOp= s_ndb->nextEvent();
6810 while (pOp != NULL)
6811 {
6812 if (!pOp->hasError())
6813 {
6814 ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
6815 &post_epoch_log_list,
6816 &post_epoch_unlock_list,
6817 &mem_root);
6818 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
6819 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6820 "<empty>"));
6821 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
6822 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
6823 "<empty>"));
6824 if (i_ndb->getEventOperation() == NULL &&
6825 s_ndb->getEventOperation() == NULL &&
6826 do_ndbcluster_binlog_close_connection == BCCC_running)
6827 {
6828 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
6829 do_ndbcluster_binlog_close_connection= BCCC_restart;
6830 if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
6831 {
6832 sql_print_error("NDB Binlog: latest transaction in epoch %u/%u not in binlog "
6833 "as latest received epoch is %u/%u",
6834 (uint)(ndb_get_latest_trans_gci() >> 32),
6835 (uint)(ndb_get_latest_trans_gci()),
6836 (uint)(ndb_latest_received_binlog_epoch >> 32),
6837 (uint)(ndb_latest_received_binlog_epoch));
6838 }
6839 }
6840 }
6841 else
6842 sql_print_error("NDB: error %lu (%s) on handling "
6843 "binlog schema event",
6844 (ulong) pOp->getNdbError().code,
6845 pOp->getNdbError().message);
6846 pOp= s_ndb->nextEvent();
6847 }
6848 updateInjectorStats(s_ndb, i_ndb);
6849 }
6850
6851 if (!ndb_binlog_running)
6852 {
6853 /*
6854 Just consume any events, not used if no binlogging
6855 e.g. node failure events
6856 */
6857 Uint64 tmp_gci;
6858 if (i_ndb->pollEvents(0, &tmp_gci))
6859 {
6860 NdbEventOperation *pOp;
6861 while ((pOp= i_ndb->nextEvent()))
6862 {
6863 if ((unsigned) pOp->getEventType() >=
6864 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
6865 {
6866 ndb_binlog_index_row row;
6867 ndb_binlog_thread_handle_non_data_event(thd, pOp, row);
6868 }
6869 }
6870 if (i_ndb->getEventOperation() == NULL &&
6871 s_ndb->getEventOperation() == NULL &&
6872 do_ndbcluster_binlog_close_connection == BCCC_running)
6873 {
6874 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
6875 do_ndbcluster_binlog_close_connection= BCCC_restart;
6876 }
6877 }
6878 updateInjectorStats(s_ndb, i_ndb);
6879 }
6880 else if (res > 0 ||
6881 (ndb_log_empty_epochs() &&
6882 gci > ndb_latest_handled_binlog_epoch))
6883 {
6884 DBUG_PRINT("info", ("pollEvents res: %d", res));
6885 thd->proc_info= "Processing events";
6886 uchar apply_status_buf[512];
6887 TABLE *apply_status_table= NULL;
6888 if (ndb_apply_status_share)
6889 {
6890 /*
6891 We construct the buffer to write the apply status binlog
6892 event here, as the table->record[0] buffer is referenced
6893 by the apply status event operation, and will be filled
6894 with data at the nextEvent call if the first event should
6895 happen to be from the apply status table
6896 */
6897 Ndb_event_data *event_data= ndb_apply_status_share->event_data;
6898 if (!event_data)
6899 {
6900 DBUG_ASSERT(ndb_apply_status_share->op);
6901 event_data=
6902 (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
6903 DBUG_ASSERT(event_data);
6904 }
6905 apply_status_table= event_data->shadow_table;
6906
6907 /*
6908 Intialize apply_status_table->record[0]
6909 */
6910 empty_record(apply_status_table);
6911
6912 apply_status_table->field[0]->store((longlong)::server_id, true);
6913 /*
6914 gci is added later, just before writing to binlog as gci
6915 is unknown here
6916 */
6917 apply_status_table->field[2]->store("", 0, &my_charset_bin);
6918 apply_status_table->field[3]->store((longlong)0, true);
6919 apply_status_table->field[4]->store((longlong)0, true);
6920 DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength);
6921 memcpy(apply_status_buf, apply_status_table->record[0],
6922 apply_status_table->s->reclength);
6923 }
6924 NdbEventOperation *pOp= i_ndb->nextEvent();
6925 ndb_binlog_index_row _row;
6926 ndb_binlog_index_row *rows= &_row;
6927 injector::transaction trans;
6928 unsigned trans_row_count= 0;
6929 unsigned trans_slave_row_count= 0;
6930 if (!pOp)
6931 {
6932 /*
6933 Must be an empty epoch since the condition
6934 (ndb_log_empty_epochs() &&
6935 gci > ndb_latest_handled_binlog_epoch)
6936 must be true we write empty epoch into
6937 ndb_binlog_index
6938 */
6939 DBUG_PRINT("info", ("Writing empty epoch for gci %llu", gci));
6940 DBUG_PRINT("info", ("Initializing transaction"));
6941 inj->new_trans(thd, &trans);
6942 rows= &_row;
6943 memset(&_row, 0, sizeof(_row));
6944 thd->variables.character_set_client= &my_charset_latin1;
6945 goto commit_to_binlog;
6946 }
6947 while (pOp != NULL)
6948 {
6949 rows= &_row;
6950 #ifdef RUN_NDB_BINLOG_TIMER
6951 Timer gci_timer, write_timer;
6952 int event_count= 0;
6953 gci_timer.start();
6954 #endif
6955 gci= pOp->getGCI();
6956 DBUG_PRINT("info", ("Handling gci: %u/%u",
6957 (uint)(gci >> 32),
6958 (uint)(gci)));
6959 // sometimes get TE_ALTER with invalid table
6960 DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
6961 ! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
6962 DBUG_ASSERT(gci <= ndb_latest_received_binlog_epoch);
6963
6964 /* initialize some variables for this epoch */
6965 g_ndb_log_slave_updates= opt_log_slave_updates;
6966 i_ndb->
6967 setReportThreshEventGCISlip(opt_ndb_report_thresh_binlog_epoch_slip);
6968 i_ndb->setReportThreshEventFreeMem(opt_ndb_report_thresh_binlog_mem_usage);
6969
6970 memset(&_row, 0, sizeof(_row));
6971 thd->variables.character_set_client= &my_charset_latin1;
6972 DBUG_PRINT("info", ("Initializing transaction"));
6973 inj->new_trans(thd, &trans);
6974 trans_row_count= 0;
6975 trans_slave_row_count= 0;
6976 // pass table map before epoch
6977 {
6978 Uint32 iter= 0;
6979 const NdbEventOperation *gci_op;
6980 Uint32 event_types;
6981
6982 if (!i_ndb->isConsistentGCI(gci))
6983 {
6984 char errmsg[64];
6985 uint end= sprintf(&errmsg[0],
6986 "Detected missing data in GCI %llu, "
6987 "inserting GAP event", gci);
6988 errmsg[end]= '\0';
6989 DBUG_PRINT("info",
6990 ("Detected missing data in GCI %llu, "
6991 "inserting GAP event", gci));
6992 LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
6993 inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
6994 }
6995 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
6996 != NULL)
6997 {
6998 Ndb_event_data *event_data=
6999 (Ndb_event_data *) gci_op->getCustomData();
7000 NDB_SHARE *share= (event_data)?event_data->share:NULL;
7001 DBUG_PRINT("info", ("per gci_op: 0x%lx share: 0x%lx event_types: 0x%x",
7002 (long) gci_op, (long) share, event_types));
7003 // workaround for interface returning TE_STOP events
7004 // which are normally filtered out below in the nextEvent loop
7005 if ((event_types & ~NdbDictionary::Event::TE_STOP) == 0)
7006 {
7007 DBUG_PRINT("info", ("Skipped TE_STOP on table %s",
7008 gci_op->getEvent()->getTable()->getName()));
7009 continue;
7010 }
7011 // this should not happen
7012 if (share == NULL || event_data->shadow_table == NULL)
7013 {
7014 DBUG_PRINT("info", ("no share or table %s!",
7015 gci_op->getEvent()->getTable()->getName()));
7016 continue;
7017 }
7018 if (share == ndb_apply_status_share)
7019 {
7020 // skip this table, it is handled specially
7021 continue;
7022 }
7023 TABLE *table= event_data->shadow_table;
7024 #ifndef DBUG_OFF
7025 const LEX_STRING &name= table->s->table_name;
7026 #endif
7027 if ((event_types & (NdbDictionary::Event::TE_INSERT |
7028 NdbDictionary::Event::TE_UPDATE |
7029 NdbDictionary::Event::TE_DELETE)) == 0)
7030 {
7031 DBUG_PRINT("info", ("skipping non data event table: %.*s",
7032 (int) name.length, name.str));
7033 continue;
7034 }
7035 if (!trans.good())
7036 {
7037 DBUG_PRINT("info",
7038 ("Found new data event, initializing transaction"));
7039 inj->new_trans(thd, &trans);
7040 }
7041 DBUG_PRINT("info", ("use_table: %.*s, cols %u",
7042 (int) name.length, name.str,
7043 table->s->fields));
7044 injector::transaction::table tbl(table, true);
7045 int ret = trans.use_table(::server_id, tbl);
7046 assert(ret == 0); NDB_IGNORE_VALUE(ret);
7047 }
7048 }
7049 if (trans.good())
7050 {
7051 if (apply_status_table)
7052 {
7053 #ifndef DBUG_OFF
7054 const LEX_STRING& name= apply_status_table->s->table_name;
7055 DBUG_PRINT("info", ("use_table: %.*s",
7056 (int) name.length, name.str));
7057 #endif
7058 injector::transaction::table tbl(apply_status_table, true);
7059 int ret = trans.use_table(::server_id, tbl);
7060 assert(ret == 0); NDB_IGNORE_VALUE(ret);
7061
7062 /* add the gci to the record */
7063 Field *field= apply_status_table->field[1];
7064 my_ptrdiff_t row_offset=
7065 (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]);
7066 field->move_field_offset(row_offset);
7067 field->store((longlong)gci, true);
7068 field->move_field_offset(-row_offset);
7069
7070 trans.write_row(::server_id,
7071 injector::transaction::table(apply_status_table,
7072 true),
7073 &apply_status_table->s->all_set,
7074 apply_status_table->s->fields,
7075 apply_status_buf);
7076 }
7077 else
7078 {
7079 sql_print_error("NDB: Could not get apply status share");
7080 }
7081 }
7082 #ifdef RUN_NDB_BINLOG_TIMER
7083 write_timer.start();
7084 #endif
7085 do
7086 {
7087 #ifdef RUN_NDB_BINLOG_TIMER
7088 event_count++;
7089 #endif
7090 if (pOp->hasError() &&
7091 ndb_binlog_thread_handle_error(i_ndb, pOp) < 0)
7092 goto err;
7093
7094 #ifndef DBUG_OFF
7095 {
7096 Ndb_event_data *event_data=
7097 (Ndb_event_data *) pOp->getCustomData();
7098 NDB_SHARE *share= (event_data)?event_data->share:NULL;
7099 DBUG_PRINT("info",
7100 ("EVENT TYPE: %d GCI: %u/%u last applied: %u/%u "
7101 "share: 0x%lx (%s.%s)", pOp->getEventType(),
7102 (uint)(gci >> 32),
7103 (uint)(gci),
7104 (uint)(ndb_latest_applied_binlog_epoch >> 32),
7105 (uint)(ndb_latest_applied_binlog_epoch),
7106 (long) share,
7107 share ? share->db : "'NULL'",
7108 share ? share->table_name : "'NULL'"));
7109 DBUG_ASSERT(share != 0);
7110 }
7111 // assert that there is consistancy between gci op list
7112 // and event list
7113 {
7114 Uint32 iter= 0;
7115 const NdbEventOperation *gci_op;
7116 Uint32 event_types;
7117 while ((gci_op= i_ndb->getGCIEventOperations(&iter, &event_types))
7118 != NULL)
7119 {
7120 if (gci_op == pOp)
7121 break;
7122 }
7123 DBUG_ASSERT(gci_op == pOp);
7124 DBUG_ASSERT((event_types & pOp->getEventType()) != 0);
7125 }
7126 #endif
7127 if ((unsigned) pOp->getEventType() <
7128 (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
7129 ndb_binlog_thread_handle_data_event(i_ndb, pOp, &rows, trans, trans_row_count, trans_slave_row_count);
7130 else
7131 {
7132 ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
7133 DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
7134 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
7135 "<empty>"));
7136 DBUG_PRINT("info", ("i_ndb first: %s", i_ndb->getEventOperation() ?
7137 i_ndb->getEventOperation()->getEvent()->getTable()->getName() :
7138 "<empty>"));
7139 if (i_ndb->getEventOperation() == NULL &&
7140 s_ndb->getEventOperation() == NULL &&
7141 do_ndbcluster_binlog_close_connection == BCCC_running)
7142 {
7143 DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
7144 do_ndbcluster_binlog_close_connection= BCCC_restart;
7145 if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
7146 {
7147 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
7148 "as latest received epoch is %lu",
7149 (ulong) ndb_get_latest_trans_gci(),
7150 (ulong) ndb_latest_received_binlog_epoch);
7151 }
7152 }
7153 }
7154
7155 pOp= i_ndb->nextEvent();
7156 } while (pOp && pOp->getGCI() == gci);
7157
7158 updateInjectorStats(s_ndb, i_ndb);
7159
7160 /*
7161 note! pOp is not referring to an event in the next epoch
7162 or is == 0
7163 */
7164 #ifdef RUN_NDB_BINLOG_TIMER
7165 write_timer.stop();
7166 #endif
7167
7168 while (trans.good())
7169 {
7170 if (!ndb_log_empty_epochs())
7171 {
7172 /*
7173 If
7174 - We did not add any 'real' rows to the Binlog AND
7175 - We did not apply any slave row updates, only
7176 ndb_apply_status updates
7177 THEN
7178 Don't write the Binlog transaction which just
7179 contains ndb_apply_status updates.
7180 (For cicular rep with log_apply_status, ndb_apply_status
7181 updates will propagate while some related, real update
7182 is propagating)
7183 */
7184 if ((trans_row_count == 0) &&
7185 (! (opt_ndb_log_apply_status &&
7186 trans_slave_row_count) ))
7187 {
7188 /* nothing to commit, rollback instead */
7189 if (int r= trans.rollback())
7190 {
7191 sql_print_error("NDB Binlog: "
7192 "Error during ROLLBACK of GCI %u/%u. Error: %d",
7193 uint(gci >> 32), uint(gci), r);
7194 /* TODO: Further handling? */
7195 }
7196 break;
7197 }
7198 }
7199 commit_to_binlog:
7200 thd->proc_info= "Committing events to binlog";
7201 injector::transaction::binlog_pos start= trans.start_pos();
7202 if (int r= trans.commit())
7203 {
7204 sql_print_error("NDB Binlog: "
7205 "Error during COMMIT of GCI. Error: %d",
7206 r);
7207 /* TODO: Further handling? */
7208 }
7209 rows->gci= (Uint32)(gci >> 32); // Expose gci hi/lo
7210 rows->epoch= gci;
7211 rows->master_log_file= start.file_name();
7212 rows->master_log_pos= start.file_pos();
7213
7214 DBUG_PRINT("info", ("COMMIT gci: %lu", (ulong) gci));
7215 if (opt_ndb_log_binlog_index)
7216 {
7217 if (ndb_binlog_index_table__write_rows(thd, rows))
7218 {
7219 /*
7220 Writing to ndb_binlog_index failed, check if we are
7221 being killed and retry
7222 */
7223 if (thd->killed)
7224 {
7225 (void) mysql_mutex_lock(&LOCK_thread_count);
7226 volatile THD::killed_state killed= thd->killed;
7227 /* We are cleaning up, allow for flushing last epoch */
7228 thd->killed= THD::NOT_KILLED;
7229 ndb_binlog_index_table__write_rows(thd, rows);
7230 /* Restore kill flag */
7231 thd->killed= killed;
7232 (void) mysql_mutex_unlock(&LOCK_thread_count);
7233 }
7234 }
7235 }
7236 ndb_latest_applied_binlog_epoch= gci;
7237 break;
7238 }
7239 ndb_latest_handled_binlog_epoch= gci;
7240
7241 #ifdef RUN_NDB_BINLOG_TIMER
7242 gci_timer.stop();
7243 sql_print_information("gci %ld event_count %d write time "
7244 "%ld(%d e/s), total time %ld(%d e/s)",
7245 (ulong)gci, event_count,
7246 write_timer.elapsed_ms(),
7247 (1000*event_count) / write_timer.elapsed_ms(),
7248 gci_timer.elapsed_ms(),
7249 (1000*event_count) / gci_timer.elapsed_ms());
7250 #endif
7251 }
7252 if(!i_ndb->isConsistent(gci))
7253 {
7254 char errmsg[64];
7255 uint end= sprintf(&errmsg[0],
7256 "Detected missing data in GCI %llu, "
7257 "inserting GAP event", gci);
7258 errmsg[end]= '\0';
7259 DBUG_PRINT("info",
7260 ("Detected missing data in GCI %llu, "
7261 "inserting GAP event", gci));
7262 LEX_STRING const msg= { C_STRING_WITH_LEN(errmsg) };
7263 inj->record_incident(thd, INCIDENT_LOST_EVENTS, msg);
7264 }
7265 }
7266
7267 ndb_binlog_thread_handle_schema_event_post_epoch(thd,
7268 &post_epoch_log_list,
7269 &post_epoch_unlock_list);
7270 free_root(&mem_root, MYF(0));
7271 *root_ptr= old_root;
7272 ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
7273 }
7274 err:
7275 if (do_ndbcluster_binlog_close_connection != BCCC_restart)
7276 {
7277 sql_print_information("Stopping Cluster Binlog");
7278 DBUG_PRINT("info",("Shutting down cluster binlog thread"));
7279 thd->proc_info= "Shutting down";
7280 }
7281 else
7282 {
7283 sql_print_information("Restarting Cluster Binlog");
7284 DBUG_PRINT("info",("Restarting cluster binlog thread"));
7285 thd->proc_info= "Restarting";
7286 }
7287 if (!have_injector_mutex_lock)
7288 pthread_mutex_lock(&injector_mutex);
7289 /* don't mess with the injector_ndb anymore from other threads */
7290 injector_thd= 0;
7291 injector_ndb= 0;
7292 schema_ndb= 0;
7293 pthread_mutex_unlock(&injector_mutex);
7294 thd->db= 0; // as not to try to free memory
7295
7296 /*
7297 This will cause the util thread to start to try to initialize again
7298 via ndbcluster_setup_binlog_table_shares. But since injector_ndb is
7299 set to NULL it will not succeed until injector_ndb is reinitialized.
7300 */
7301 ndb_binlog_tables_inited= FALSE;
7302
7303 if (ndb_apply_status_share)
7304 {
7305 /* ndb_share reference binlog extra free */
7306 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
7307 ndb_apply_status_share->key,
7308 ndb_apply_status_share->use_count));
7309 free_share(&ndb_apply_status_share);
7310 ndb_apply_status_share= 0;
7311 }
7312 if (ndb_schema_share)
7313 {
7314 /* begin protect ndb_schema_share */
7315 pthread_mutex_lock(&ndb_schema_share_mutex);
7316 /* ndb_share reference binlog extra free */
7317 DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u",
7318 ndb_schema_share->key,
7319 ndb_schema_share->use_count));
7320 free_share(&ndb_schema_share);
7321 ndb_schema_share= 0;
7322 pthread_mutex_unlock(&ndb_schema_share_mutex);
7323 /* end protect ndb_schema_share */
7324 }
7325
7326 /* remove all event operations */
7327 if (s_ndb)
7328 {
7329 remove_event_operations(s_ndb);
7330 delete s_ndb;
7331 s_ndb= 0;
7332 }
7333 if (i_ndb)
7334 {
7335 remove_event_operations(i_ndb);
7336 delete i_ndb;
7337 i_ndb= 0;
7338 }
7339
7340 my_hash_free(&ndb_schema_objects);
7341
7342 if (thd_ndb)
7343 {
7344 Thd_ndb::release(thd_ndb);
7345 thd_set_thd_ndb(thd, NULL);
7346 thd_ndb= NULL;
7347 }
7348
7349 /**
7350 * release all extra references from tables
7351 */
7352 {
7353 if (opt_ndb_extra_logging > 9)
7354 sql_print_information("NDB Binlog: Release extra share references");
7355
7356 pthread_mutex_lock(&ndbcluster_mutex);
7357 for (uint i= 0; i < ndbcluster_open_tables.records;)
7358 {
7359 NDB_SHARE * share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,
7360 i);
7361 if (share->state != NSS_DROPPED)
7362 {
7363 /*
7364 The share kept by the server has not been freed, free it
7365 */
7366 share->state= NSS_DROPPED;
7367 /* ndb_share reference create free */
7368 DBUG_PRINT("NDB_SHARE", ("%s create free use_count: %u",
7369 share->key, share->use_count));
7370 free_share(&share, TRUE);
7371
7372 /**
7373 * This might have altered hash table...not sure if it's stable..
7374 * so we'll restart instead
7375 */
7376 i = 0;
7377 }
7378 else
7379 {
7380 i++;
7381 }
7382 }
7383 pthread_mutex_unlock(&ndbcluster_mutex);
7384 }
7385
7386 close_cached_tables((THD*) 0, (TABLE_LIST*) 0, FALSE, FALSE, FALSE);
7387 if (opt_ndb_extra_logging > 15)
7388 {
7389 sql_print_information("NDB Binlog: remaining open tables: ");
7390 for (uint i= 0; i < ndbcluster_open_tables.records; i++)
7391 {
7392 NDB_SHARE* share = (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables,i);
7393 sql_print_information(" %s.%s state: %u use_count: %u",
7394 share->db,
7395 share->table_name,
7396 (uint)share->state,
7397 share->use_count);
7398 }
7399 }
7400
7401 if (do_ndbcluster_binlog_close_connection == BCCC_restart)
7402 {
7403 pthread_mutex_lock(&injector_mutex);
7404 goto restart_cluster_failure;
7405 }
7406
7407 net_end(&thd->net);
7408 thd->cleanup();
7409 delete thd;
7410
7411 ndb_binlog_thread_running= -1;
7412 ndb_binlog_running= FALSE;
7413 (void) pthread_cond_signal(&injector_cond);
7414
7415 DBUG_PRINT("exit", ("ndb_binlog_thread"));
7416
7417 DBUG_LEAVE; // Must match DBUG_ENTER()
7418 my_thread_end();
7419 pthread_exit(0);
7420 return NULL; // Avoid compiler warnings
7421 }
7422
7423 bool
ndbcluster_show_status_binlog(THD * thd,stat_print_fn * stat_print,enum ha_stat_type stat_type)7424 ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
7425 enum ha_stat_type stat_type)
7426 {
7427 char buf[IO_SIZE];
7428 uint buflen;
7429 ulonglong ndb_latest_epoch= 0;
7430 DBUG_ENTER("ndbcluster_show_status_binlog");
7431
7432 pthread_mutex_lock(&injector_mutex);
7433 if (injector_ndb)
7434 {
7435 char buff1[22],buff2[22],buff3[22],buff4[22],buff5[22];
7436 ndb_latest_epoch= injector_ndb->getLatestGCI();
7437 pthread_mutex_unlock(&injector_mutex);
7438
7439 buflen=
7440 my_snprintf(buf, sizeof(buf),
7441 "latest_epoch=%s, "
7442 "latest_trans_epoch=%s, "
7443 "latest_received_binlog_epoch=%s, "
7444 "latest_handled_binlog_epoch=%s, "
7445 "latest_applied_binlog_epoch=%s",
7446 llstr(ndb_latest_epoch, buff1),
7447 llstr(ndb_get_latest_trans_gci(), buff2),
7448 llstr(ndb_latest_received_binlog_epoch, buff3),
7449 llstr(ndb_latest_handled_binlog_epoch, buff4),
7450 llstr(ndb_latest_applied_binlog_epoch, buff5));
7451 if (stat_print(thd, ndbcluster_hton_name, ndbcluster_hton_name_length,
7452 "binlog", strlen("binlog"),
7453 buf, buflen))
7454 DBUG_RETURN(TRUE);
7455 }
7456 else
7457 pthread_mutex_unlock(&injector_mutex);
7458 DBUG_RETURN(FALSE);
7459 }
7460
7461 /*
7462 AnyValue carries ServerId or Reserved codes
7463 Bits from opt_server_id_bits to 30 may carry other data
7464 so we ignore them when reading/setting AnyValue.
7465
7466 332 21 10 0
7467 10987654321098765432109876543210
7468 roooooooooooooooooooooooosssssss
7469
7470 r = Reserved bit indicates whether
7471 bits 0-7+ have ServerId (0) or
7472 some special reserved code (1).
7473 o = Optional bits, depending on value
7474 of server-id-bits will be
7475 serverid bits or user-specific
7476 data
7477 s = Serverid bits or reserved codes
7478 At least 7 bits will be available
7479 for serverid or reserved codes
7480
7481 */
7482
7483 #define NDB_ANYVALUE_RESERVED_BIT 0x80000000
7484 #define NDB_ANYVALUE_RESERVED_MASK 0x8000007f
7485
7486 #define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f
7487
7488 #ifndef DBUG_OFF
dbug_ndbcluster_anyvalue_set_userbits(Uint32 & anyValue)7489 void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue)
7490 {
7491 /*
7492 Set userData part of AnyValue (if there is one) to
7493 all 1s to test that it is ignored
7494 */
7495 const Uint32 userDataMask = ~(opt_server_id_mask |
7496 NDB_ANYVALUE_RESERVED_BIT);
7497
7498 anyValue |= userDataMask;
7499 }
7500 #endif
7501
ndbcluster_anyvalue_is_reserved(Uint32 anyValue)7502 bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue)
7503 {
7504 return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0);
7505 }
7506
ndbcluster_anyvalue_is_nologging(Uint32 anyValue)7507 bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue)
7508 {
7509 return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) ==
7510 NDB_ANYVALUE_NOLOGGING_CODE);
7511 }
7512
ndbcluster_anyvalue_set_nologging(Uint32 & anyValue)7513 void ndbcluster_anyvalue_set_nologging(Uint32& anyValue)
7514 {
7515 anyValue |= NDB_ANYVALUE_NOLOGGING_CODE;
7516 }
7517
ndbcluster_anyvalue_set_normal(Uint32 & anyValue)7518 void ndbcluster_anyvalue_set_normal(Uint32& anyValue)
7519 {
7520 /* Clear reserved bit and serverid bits */
7521 anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT);
7522 anyValue &= ~(opt_server_id_mask);
7523 }
7524
ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)7525 bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)
7526 {
7527 return ((serverId & ~opt_server_id_mask) == 0);
7528 }
7529
ndbcluster_anyvalue_get_serverid(Uint32 anyValue)7530 Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue)
7531 {
7532 assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
7533
7534 return (anyValue & opt_server_id_mask);
7535 }
7536
ndbcluster_anyvalue_set_serverid(Uint32 & anyValue,Uint32 serverId)7537 void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId)
7538 {
7539 assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
7540 anyValue &= ~(opt_server_id_mask);
7541 anyValue |= (serverId & opt_server_id_mask);
7542 }
7543
7544 #ifdef NDB_WITHOUT_SERVER_ID_BITS
7545
7546 /* No --server-id-bits=<bits> -> implement constant opt_server_id_mask */
7547 ulong opt_server_id_mask = ~0;
7548
7549 #endif
7550
7551 #endif
7552