1 /* Copyright (c) 2006, 2017, Oracle and/or its affiliates.
2    Copyright (c) 2010, 2017, MariaDB Corporation
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
16 
17 #include "mariadb.h" // For HAVE_REPLICATION
18 #include "sql_priv.h"
19 #include <my_dir.h>
20 #include "rpl_mi.h"
21 #include "slave.h"
22 #include "strfunc.h"
23 #include "sql_repl.h"
24 
25 #ifdef HAVE_REPLICATION
26 
27 #define DEFAULT_CONNECT_RETRY 60
28 
29 static void init_master_log_pos(Master_info* mi);
30 
Master_info(LEX_CSTRING * connection_name_arg,bool is_slave_recovery)31 Master_info::Master_info(LEX_CSTRING *connection_name_arg,
32                          bool is_slave_recovery)
33   :Slave_reporting_capability("I/O"),
34    ssl(0), ssl_verify_server_cert(1), fd(-1), io_thd(0),
35    rli(is_slave_recovery), port(MYSQL_PORT),
36    checksum_alg_before_fd(BINLOG_CHECKSUM_ALG_UNDEF),
37    connect_retry(DEFAULT_CONNECT_RETRY), inited(0), abort_slave(0),
38    slave_running(MYSQL_SLAVE_NOT_RUN), slave_run_id(0),
39    clock_diff_with_master(0),
40    sync_counter(0), heartbeat_period(0), received_heartbeats(0),
41    master_id(0), prev_master_id(0),
42    using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0),
43    gtid_reconnect_event_skip_count(0), gtid_event_seen(false),
44    in_start_all_slaves(0), in_stop_all_slaves(0), in_flush_all_relay_logs(0),
45    users(0), killed(0),
46    total_ddl_groups(0), total_non_trans_groups(0), total_trans_groups(0)
47 {
48   char *tmp;
49   host[0] = 0; user[0] = 0; password[0] = 0;
50   ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
51   ssl_cipher[0]= 0; ssl_key[0]= 0;
52   ssl_crl[0]= 0; ssl_crlpath[0]= 0;
53 
54   /*
55     Store connection name and lower case connection name
56     It's safe to ignore any OMM errors as this is checked by error()
57   */
58   connection_name.length= cmp_connection_name.length=
59     connection_name_arg->length;
60   if ((connection_name.str= tmp= (char*)
61        my_malloc(connection_name_arg->length*2+2, MYF(MY_WME))))
62   {
63     strmake(tmp, connection_name_arg->str, connection_name.length);
64     tmp+= connection_name_arg->length+1;
65     cmp_connection_name.str= tmp;
66     memcpy(tmp, connection_name_arg->str, connection_name.length+1);
67     my_casedn_str(system_charset_info, tmp);
68   }
69   /*
70     When MySQL restarted, all Rpl_filter settings which aren't in the my.cnf
71     will be lost. If you want to lose a setting after restart, you
72     should add them into my.cnf
73   */
74   rpl_filter= get_or_create_rpl_filter(connection_name.str,
75                                        connection_name.length);
76   copy_filter_setting(rpl_filter, global_rpl_filter);
77 
78   parallel_mode= rpl_filter->get_parallel_mode();
79 
80   my_init_dynamic_array(&ignore_server_ids,
81                         sizeof(global_system_variables.server_id), 16, 16,
82                         MYF(0));
83   bzero((char*) &file, sizeof(file));
84   mysql_mutex_init(key_master_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
85   mysql_mutex_init(key_master_info_data_lock, &data_lock, MY_MUTEX_INIT_FAST);
86   mysql_mutex_init(key_master_info_start_stop_lock, &start_stop_lock,
87                    MY_MUTEX_INIT_SLOW);
88   mysql_mutex_setflags(&run_lock, MYF_NO_DEADLOCK_DETECTION);
89   mysql_mutex_setflags(&data_lock, MYF_NO_DEADLOCK_DETECTION);
90   mysql_mutex_init(key_master_info_sleep_lock, &sleep_lock, MY_MUTEX_INIT_FAST);
91   mysql_cond_init(key_master_info_data_cond, &data_cond, NULL);
92   mysql_cond_init(key_master_info_start_cond, &start_cond, NULL);
93   mysql_cond_init(key_master_info_stop_cond, &stop_cond, NULL);
94   mysql_cond_init(key_master_info_sleep_cond, &sleep_cond, NULL);
95 }
96 
97 
98 /**
99    Wait until no one is using Master_info
100 */
101 
wait_until_free()102 void Master_info::wait_until_free()
103 {
104   mysql_mutex_lock(&sleep_lock);
105   killed= 1;
106   while (users)
107     mysql_cond_wait(&sleep_cond, &sleep_lock);
108   mysql_mutex_unlock(&sleep_lock);
109 }
110 
111 /**
112    Delete master_info
113 */
114 
~Master_info()115 Master_info::~Master_info()
116 {
117   wait_until_free();
118   my_free(const_cast<char*>(connection_name.str));
119   delete_dynamic(&ignore_server_ids);
120   mysql_mutex_destroy(&run_lock);
121   mysql_mutex_destroy(&data_lock);
122   mysql_mutex_destroy(&sleep_lock);
123   mysql_mutex_destroy(&start_stop_lock);
124   mysql_cond_destroy(&data_cond);
125   mysql_cond_destroy(&start_cond);
126   mysql_cond_destroy(&stop_cond);
127   mysql_cond_destroy(&sleep_cond);
128 }
129 
130 /**
131    A comparison function to be supplied as argument to @c sort_dynamic()
132    and @c bsearch()
133 
134    @return -1 if first argument is less, 0 if it equal to, 1 if it is greater
135    than the second
136 */
change_master_id_cmp(const void * id1,const void * id2)137 static int change_master_id_cmp(const void *id1, const void *id2)
138 {
139   return (*(ulong *) id1 - *(ulong *) id2);
140 }
141 
142 /**
143    Reports if the s_id server has been configured to ignore events
144    it generates with
145 
146       CHANGE MASTER IGNORE_SERVER_IDS= ( list of server ids )
147 
148    Method is called from the io thread event receiver filtering.
149 
150    @param      s_id    the master server identifier
151 
152    @retval   TRUE    if s_id is in the list of ignored master  servers,
153    @retval   FALSE   otherwise.
154  */
shall_ignore_server_id(ulong s_id)155 bool Master_info::shall_ignore_server_id(ulong s_id)
156 {
157   if (likely(ignore_server_ids.elements == 1))
158     return (* (ulong*) dynamic_array_ptr(&ignore_server_ids, 0)) == s_id;
159   else
160     return bsearch((const ulong *) &s_id,
161                    ignore_server_ids.buffer,
162                    ignore_server_ids.elements, sizeof(ulong),
163                    change_master_id_cmp) != NULL;
164 }
165 
clear_in_memory_info(bool all)166 void Master_info::clear_in_memory_info(bool all)
167 {
168   init_master_log_pos(this);
169   if (all)
170   {
171     port= MYSQL_PORT;
172     host[0] = 0; user[0] = 0; password[0] = 0;
173     domain_id_filter.clear_ids();
174     reset_dynamic(&ignore_server_ids);
175   }
176 }
177 
178 
179 const char *
using_gtid_astext(enum enum_using_gtid arg)180 Master_info::using_gtid_astext(enum enum_using_gtid arg)
181 {
182   switch (arg)
183   {
184   case USE_GTID_NO:
185     return "No";
186   case USE_GTID_SLAVE_POS:
187     return "Slave_Pos";
188   default:
189     DBUG_ASSERT(arg == USE_GTID_CURRENT_POS);
190     return "Current_Pos";
191   }
192 }
193 
194 
init_master_log_pos(Master_info * mi)195 void init_master_log_pos(Master_info* mi)
196 {
197   DBUG_ENTER("init_master_log_pos");
198 
199   mi->master_log_name[0] = 0;
200   mi->master_log_pos = BIN_LOG_HEADER_SIZE;             // skip magic number
201   mi->using_gtid= Master_info::USE_GTID_NO;
202   mi->gtid_current_pos.reset();
203   mi->events_queued_since_last_gtid= 0;
204   mi->gtid_reconnect_event_skip_count= 0;
205   mi->gtid_event_seen= false;
206 
207   /* Intentionally init ssl_verify_server_cert to 0, no option available  */
208   mi->ssl_verify_server_cert= 0;
209   /*
210     always request heartbeat unless master_heartbeat_period is set
211     explicitly zero.  Here is the default value for heartbeat period
212     if CHANGE MASTER did not specify it.  (no data loss in conversion
213     as hb period has a max)
214   */
215   mi->heartbeat_period= (float) MY_MIN(SLAVE_MAX_HEARTBEAT_PERIOD,
216                                     (slave_net_timeout/2.0));
217   DBUG_ASSERT(mi->heartbeat_period > (float) 0.001
218               || mi->heartbeat_period == 0);
219 
220   DBUG_VOID_RETURN;
221 }
222 
223 /**
224   Parses the IO_CACHE for "key=" and returns the "key".
225   If no '=' found, returns the whole line (for END_MARKER).
226 
227   @param key      [OUT]               Key buffer
228   @param max_size [IN]                Maximum buffer size
229   @param f        [IN]                IO_CACHE file
230   @param found_equal [OUT]            Set true if a '=' was found.
231 
232   @retval 0                           Either "key=" or '\n' found
233   @retval 1                           EOF
234 */
235 static int
read_mi_key_from_file(char * key,int max_size,IO_CACHE * f,bool * found_equal)236 read_mi_key_from_file(char *key, int max_size, IO_CACHE *f, bool *found_equal)
237 {
238   int i= 0, c;
239 
240   DBUG_ENTER("read_key_from_file");
241 
242   *found_equal= false;
243   if (max_size <= 0)
244     DBUG_RETURN(1);
245   for (;;)
246   {
247     if (i >= max_size-1)
248     {
249       key[i] = '\0';
250       DBUG_RETURN(0);
251     }
252     c= my_b_get(f);
253     if (c == my_b_EOF)
254     {
255       DBUG_RETURN(1);
256     }
257     else if (c == '\n')
258     {
259       key[i]= '\0';
260       DBUG_RETURN(0);
261     }
262     else if (c == '=')
263     {
264       key[i]= '\0';
265       *found_equal= true;
266       DBUG_RETURN(0);
267     }
268     else
269     {
270       key[i]= c;
271       ++i;
272     }
273   }
274   /* NotReached */
275 }
276 
277 enum {
278   LINES_IN_MASTER_INFO_WITH_SSL= 14,
279 
280   /* 5.1.16 added value of master_ssl_verify_server_cert */
281   LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT= 15,
282 
283   /* 5.5 added value of master_heartbeat_period */
284   LINE_FOR_MASTER_HEARTBEAT_PERIOD= 16,
285 
286   /* MySQL Cluster 6.3 added master_bind */
287   LINE_FOR_MASTER_BIND = 17,
288 
289   /* 6.0 added value of master_ignore_server_id */
290   LINE_FOR_REPLICATE_IGNORE_SERVER_IDS= 18,
291 
292   /* 6.0 added value of master_uuid */
293   LINE_FOR_MASTER_UUID= 19,
294 
295   /* line for master_retry_count */
296   LINE_FOR_MASTER_RETRY_COUNT= 20,
297 
298   /* line for ssl_crl */
299   LINE_FOR_SSL_CRL= 21,
300 
301   /* line for ssl_crl */
302   LINE_FOR_SSL_CRLPATH= 22,
303 
304   /* MySQL 5.6 fixed-position lines. */
305   LINE_FOR_FIRST_MYSQL_5_6=23,
306   LINE_FOR_LAST_MYSQL_5_6=23,
307   /* Reserved lines for MySQL future versions. */
308   LINE_FOR_LAST_MYSQL_FUTURE=33,
309   /* Number of (fixed-position) lines used when saving master info file */
310   LINES_IN_MASTER_INFO= LINE_FOR_LAST_MYSQL_FUTURE
311 };
312 
init_master_info(Master_info * mi,const char * master_info_fname,const char * slave_info_fname,bool abort_if_no_master_info_file,int thread_mask)313 int init_master_info(Master_info* mi, const char* master_info_fname,
314                      const char* slave_info_fname,
315                      bool abort_if_no_master_info_file,
316                      int thread_mask)
317 {
318   int fd,error;
319   char fname[FN_REFLEN+128];
320   DBUG_ENTER("init_master_info");
321 
322   if (mi->inited)
323   {
324     /*
325       We have to reset read position of relay-log-bin as we may have
326       already been reading from 'hotlog' when the slave was stopped
327       last time. If this case pos_in_file would be set and we would
328       get a crash when trying to read the signature for the binary
329       relay log.
330 
331       We only rewind the read position if we are starting the SQL
332       thread. The handle_slave_sql thread assumes that the read
333       position is at the beginning of the file, and will read the
334       "signature" and then fast-forward to the last position read.
335     */
336     if (thread_mask & SLAVE_SQL)
337     {
338       bool hot_log= FALSE;
339       /*
340          my_b_seek does an implicit flush_io_cache, so we need to:
341 
342          1. check if this log is active (hot)
343          2. if it is we keep log_lock until the seek ends, otherwise
344             release it right away.
345 
346          If we did not take log_lock, SQL thread might race with IO
347          thread for the IO_CACHE mutex.
348 
349        */
350       mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock();
351       mysql_mutex_lock(log_lock);
352       hot_log= mi->rli.relay_log.is_active(mi->rli.linfo.log_file_name);
353 
354       if (!hot_log)
355         mysql_mutex_unlock(log_lock);
356 
357       my_b_seek(mi->rli.cur_log, (my_off_t) 0);
358 
359       if (hot_log)
360         mysql_mutex_unlock(log_lock);
361     }
362     DBUG_RETURN(0);
363   }
364 
365   mi->mysql=0;
366   mi->file_id=1;
367   fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
368 
369   /*
370     We need a mutex while we are changing master info parameters to
371     keep other threads from reading bogus info
372   */
373 
374   mysql_mutex_lock(&mi->data_lock);
375   fd = mi->fd;
376 
377   /* does master.info exist ? */
378 
379   if (access(fname,F_OK))
380   {
381     if (abort_if_no_master_info_file)
382     {
383       mysql_mutex_unlock(&mi->data_lock);
384       DBUG_RETURN(0);
385     }
386     /*
387       if someone removed the file from underneath our feet, just close
388       the old descriptor and re-create the old file
389     */
390     if (fd >= 0)
391       mysql_file_close(fd, MYF(MY_WME));
392     if ((fd= mysql_file_open(key_file_master_info,
393                              fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
394     {
395       sql_print_error("Failed to create a new master info file (\
396 file '%s', errno %d)", fname, my_errno);
397       goto err;
398     }
399     if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
400                       MYF(MY_WME)))
401     {
402       sql_print_error("Failed to create a cache on master info file (\
403 file '%s')", fname);
404       goto err;
405     }
406 
407     mi->fd = fd;
408     mi->clear_in_memory_info(false);
409 
410   }
411   else // file exists
412   {
413     if (fd >= 0)
414       reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
415     else
416     {
417       if ((fd= mysql_file_open(key_file_master_info,
418                                fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0 )
419       {
420         sql_print_error("Failed to open the existing master info file (\
421 file '%s', errno %d)", fname, my_errno);
422         goto err;
423       }
424       if (init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
425                         0, MYF(MY_WME)))
426       {
427         sql_print_error("Failed to create a cache on master info file (\
428 file '%s')", fname);
429         goto err;
430       }
431     }
432 
433     mi->fd = fd;
434     int port, connect_retry, master_log_pos, lines;
435     int ssl= 0, ssl_verify_server_cert= 0;
436     float master_heartbeat_period= 0.0;
437     char *first_non_digit;
438     char buf[HOSTNAME_LENGTH+1];
439 
440     /*
441        Starting from 4.1.x master.info has new format. Now its
442        first line contains number of lines in file. By reading this
443        number we will be always distinguish to which version our
444        master.info corresponds to. We can't simply count lines in
445        file since versions before 4.1.x could generate files with more
446        lines than needed.
447        If first line doesn't contain a number or contain number less than
448        LINES_IN_MASTER_INFO_WITH_SSL then such file is treated like file
449        from pre 4.1.1 version.
450        There is no ambiguity when reading an old master.info, as before
451        4.1.1, the first line contained the binlog's name, which is either
452        empty or has an extension (contains a '.'), so can't be confused
453        with an integer.
454 
455        So we're just reading first line and trying to figure which version
456        is this.
457     */
458 
459     /*
460        The first row is temporarily stored in mi->master_log_name,
461        if it is line count and not binlog name (new format) it will be
462        overwritten by the second row later.
463     */
464     if (init_strvar_from_file(mi->master_log_name,
465                               sizeof(mi->master_log_name), &mi->file,
466                               ""))
467       goto errwithmsg;
468 
469     lines= strtoul(mi->master_log_name, &first_non_digit, 10);
470 
471     if (mi->master_log_name[0]!='\0' &&
472         *first_non_digit=='\0' && lines >= LINES_IN_MASTER_INFO_WITH_SSL)
473     {
474       /* Seems to be new format => read master log name from next line */
475       if (init_strvar_from_file(mi->master_log_name,
476             sizeof(mi->master_log_name), &mi->file, ""))
477         goto errwithmsg;
478     }
479     else
480       lines= 7;
481 
482     if (init_intvar_from_file(&master_log_pos, &mi->file, 4) ||
483         init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file, 0) ||
484         init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file, "test") ||
485         init_strvar_from_file(mi->password, SCRAMBLED_PASSWORD_CHAR_LENGTH+1,
486                               &mi->file, 0) ||
487         init_intvar_from_file(&port, &mi->file, MYSQL_PORT) ||
488         init_intvar_from_file(&connect_retry, &mi->file,
489                               DEFAULT_CONNECT_RETRY))
490       goto errwithmsg;
491 
492     /*
493        If file has ssl part use it even if we have server without
494        SSL support. But these options will be ignored later when
495        slave will try connect to master, so in this case warning
496        is printed.
497      */
498     if (lines >= LINES_IN_MASTER_INFO_WITH_SSL)
499     {
500       if (init_intvar_from_file(&ssl, &mi->file, 0) ||
501           init_strvar_from_file(mi->ssl_ca, sizeof(mi->ssl_ca),
502                                 &mi->file, 0) ||
503           init_strvar_from_file(mi->ssl_capath, sizeof(mi->ssl_capath),
504                                 &mi->file, 0) ||
505           init_strvar_from_file(mi->ssl_cert, sizeof(mi->ssl_cert),
506                                 &mi->file, 0) ||
507           init_strvar_from_file(mi->ssl_cipher, sizeof(mi->ssl_cipher),
508                                 &mi->file, 0) ||
509           init_strvar_from_file(mi->ssl_key, sizeof(mi->ssl_key),
510                                 &mi->file, 0))
511         goto errwithmsg;
512 
513       /*
514         Starting from 5.1.16 ssl_verify_server_cert might be
515         in the file
516       */
517       if (lines >= LINE_FOR_MASTER_SSL_VERIFY_SERVER_CERT &&
518           init_intvar_from_file(&ssl_verify_server_cert, &mi->file, 0))
519         goto errwithmsg;
520       /*
521         Starting from 6.0 master_heartbeat_period might be
522         in the file
523       */
524       if (lines >= LINE_FOR_MASTER_HEARTBEAT_PERIOD &&
525           init_floatvar_from_file(&master_heartbeat_period, &mi->file, 0.0))
526         goto errwithmsg;
527       /*
528 	Starting from MySQL Cluster 6.3 master_bind might be in the file
529 	(this is just a reservation to avoid future upgrade problems)
530        */
531       if (lines >= LINE_FOR_MASTER_BIND &&
532 	  init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
533 	  goto errwithmsg;
534       /*
535         Starting from 6.0 list of server_id of ignorable servers might be
536         in the file
537       */
538       if (lines >= LINE_FOR_REPLICATE_IGNORE_SERVER_IDS &&
539           init_dynarray_intvar_from_file(&mi->ignore_server_ids, &mi->file))
540       {
541         sql_print_error("Failed to initialize master info ignore_server_ids");
542         goto errwithmsg;
543       }
544 
545       /* reserved */
546       if (lines >= LINE_FOR_MASTER_UUID &&
547 	  init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
548 	  goto errwithmsg;
549 
550       /* Starting from 5.5 the master_retry_count may be in the repository. */
551       if (lines >= LINE_FOR_MASTER_RETRY_COUNT &&
552 	  init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
553 	  goto errwithmsg;
554 
555       if (lines >= LINE_FOR_SSL_CRLPATH &&
556 	  (init_strvar_from_file(mi->ssl_crl, sizeof(mi->ssl_crl),
557                                  &mi->file, "") ||
558 	   init_strvar_from_file(mi->ssl_crlpath, sizeof(mi->ssl_crlpath),
559                                  &mi->file, "")))
560 	  goto errwithmsg;
561 
562       /*
563         Starting with MariaDB 10.0, we use a key=value syntax, which is nicer
564         in several ways. But we leave a bunch of empty lines to accomodate
565         any future old-style additions in MySQL (this will make it easier for
566         users moving from MariaDB to MySQL, to not have MySQL try to
567         interpret a MariaDB key=value line.)
568       */
569       if (lines >= LINE_FOR_LAST_MYSQL_FUTURE)
570       {
571         uint i;
572         bool got_eq;
573         bool seen_using_gtid= false;
574         bool seen_do_domain_ids=false, seen_ignore_domain_ids=false;
575 
576         /* Skip lines used by / reserved for MySQL >= 5.6. */
577         for (i= LINE_FOR_FIRST_MYSQL_5_6; i <= LINE_FOR_LAST_MYSQL_FUTURE; ++i)
578         {
579           if (init_strvar_from_file(buf, sizeof(buf), &mi->file, ""))
580           goto errwithmsg;
581         }
582 
583         /*
584           Parse any extra key=value lines. read_key_from_file() parses the file
585           for "key=" and returns the "key" if found. The "value" can then the
586           parsed on case by case basis. The "unknown" lines would be ignored to
587           facilitate downgrades.
588           10.0 does not have the END_MARKER before any left-overs at the end
589           of the file. So ignore any but the first occurrence of a key.
590         */
591         while (!read_mi_key_from_file(buf, sizeof(buf), &mi->file, &got_eq))
592         {
593           if (got_eq && !seen_using_gtid && !strcmp(buf, "using_gtid"))
594           {
595             int val;
596             if (!init_intvar_from_file(&val, &mi->file, 0))
597             {
598               if (val == Master_info::USE_GTID_CURRENT_POS)
599                 mi->using_gtid= Master_info::USE_GTID_CURRENT_POS;
600               else if (val == Master_info::USE_GTID_SLAVE_POS)
601                 mi->using_gtid= Master_info::USE_GTID_SLAVE_POS;
602               else
603                 mi->using_gtid= Master_info::USE_GTID_NO;
604               seen_using_gtid= true;
605             } else {
606               sql_print_error("Failed to initialize master info using_gtid");
607               goto errwithmsg;
608             }
609           }
610           else if (got_eq && !seen_do_domain_ids && !strcmp(buf, "do_domain_ids"))
611           {
612             if (mi->domain_id_filter.init_ids(&mi->file,
613                                               Domain_id_filter::DO_DOMAIN_IDS))
614             {
615               sql_print_error("Failed to initialize master info do_domain_ids");
616               goto errwithmsg;
617             }
618             seen_do_domain_ids= true;
619           }
620           else if (got_eq && !seen_ignore_domain_ids &&
621                    !strcmp(buf, "ignore_domain_ids"))
622           {
623             if (mi->domain_id_filter.init_ids(&mi->file,
624                                               Domain_id_filter::IGNORE_DOMAIN_IDS))
625             {
626               sql_print_error("Failed to initialize master info "
627                               "ignore_domain_ids");
628               goto errwithmsg;
629             }
630             seen_ignore_domain_ids= true;
631           }
632           else if (!got_eq && !strcmp(buf, "END_MARKER"))
633           {
634             /*
635               Guard agaist extra left-overs at the end of file, in case a later
636               update causes the file to shrink compared to earlier contents.
637             */
638             break;
639           }
640         }
641       }
642     }
643 
644 #ifndef HAVE_OPENSSL
645     if (ssl)
646       sql_print_warning("SSL information in the master info file "
647                       "('%s') are ignored because this MySQL slave was "
648                       "compiled without SSL support.", fname);
649 #endif /* HAVE_OPENSSL */
650 
651     /*
652       This has to be handled here as init_intvar_from_file can't handle
653       my_off_t types
654     */
655     mi->master_log_pos= (my_off_t) master_log_pos;
656     mi->port= (uint) port;
657     mi->connect_retry= (uint) connect_retry;
658     mi->ssl= (my_bool) ssl;
659     mi->ssl_verify_server_cert= ssl_verify_server_cert;
660     mi->heartbeat_period= MY_MIN(SLAVE_MAX_HEARTBEAT_PERIOD, master_heartbeat_period);
661   }
662   DBUG_PRINT("master_info",("log_file_name: %s  position: %ld",
663                             mi->master_log_name,
664                             (ulong) mi->master_log_pos));
665 
666   mi->rli.mi= mi;
667   if (mi->rli.init(slave_info_fname))
668     goto err;
669 
670   mi->inited = 1;
671   mi->rli.is_relay_log_recovery= FALSE;
672   // now change cache READ -> WRITE - must do this before flush_master_info
673   reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
674   if (unlikely((error= MY_TEST(flush_master_info(mi, TRUE, TRUE)))))
675     sql_print_error("Failed to flush master info file");
676   mysql_mutex_unlock(&mi->data_lock);
677   DBUG_RETURN(error);
678 
679 errwithmsg:
680   sql_print_error("Error reading master configuration");
681 
682 err:
683   if (fd >= 0)
684   {
685     mysql_file_close(fd, MYF(0));
686     end_io_cache(&mi->file);
687   }
688   mi->fd= -1;
689   mysql_mutex_unlock(&mi->data_lock);
690   DBUG_RETURN(1);
691 }
692 
693 
694 /*
695   RETURN
696      2 - flush relay log failed
697      1 - flush master info failed
698      0 - all ok
699 */
flush_master_info(Master_info * mi,bool flush_relay_log_cache,bool need_lock_relay_log)700 int flush_master_info(Master_info* mi,
701                       bool flush_relay_log_cache,
702                       bool need_lock_relay_log)
703 {
704   IO_CACHE* file = &mi->file;
705   char lbuf[22];
706   int err= 0;
707 
708   DBUG_ENTER("flush_master_info");
709   DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
710 
711   /*
712     Flush the relay log to disk. If we don't do it, then the relay log while
713     have some part (its last kilobytes) in memory only, so if the slave server
714     dies now, with, say, from master's position 100 to 150 in memory only (not
715     on disk), and with position 150 in master.info, then when the slave
716     restarts, the I/O thread will fetch binlogs from 150, so in the relay log
717     we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
718     SQL thread will jump from 100 to 150, and replication will silently break.
719 
720     When we come to this place in code, relay log may or not be initialized;
721     the caller is responsible for setting 'flush_relay_log_cache' accordingly.
722   */
723   if (flush_relay_log_cache)
724   {
725     mysql_mutex_t *log_lock= mi->rli.relay_log.get_log_lock();
726     IO_CACHE *log_file= mi->rli.relay_log.get_log_file();
727 
728     if (need_lock_relay_log)
729       mysql_mutex_lock(log_lock);
730 
731     mysql_mutex_assert_owner(log_lock);
732     err= flush_io_cache(log_file);
733 
734     if (need_lock_relay_log)
735       mysql_mutex_unlock(log_lock);
736 
737     if (err)
738       DBUG_RETURN(2);
739   }
740 
741   /*
742     produce a line listing the total number and all the ignored server_id:s
743   */
744   char* ignore_server_ids_buf;
745   {
746     ignore_server_ids_buf=
747       (char *) my_malloc((sizeof(global_system_variables.server_id) * 3 + 1) *
748                          (1 + mi->ignore_server_ids.elements), MYF(MY_WME));
749     if (!ignore_server_ids_buf)
750       DBUG_RETURN(1);                           /* error */
751     ulong cur_len= sprintf(ignore_server_ids_buf, "%u",
752                            mi->ignore_server_ids.elements);
753     for (ulong i= 0; i < mi->ignore_server_ids.elements; i++)
754     {
755       ulong s_id;
756       get_dynamic(&mi->ignore_server_ids, (uchar*) &s_id, i);
757       cur_len+= sprintf(ignore_server_ids_buf + cur_len, " %lu", s_id);
758     }
759   }
760 
761   char *do_domain_ids_buf= 0, *ignore_domain_ids_buf= 0;
762 
763   do_domain_ids_buf=
764     mi->domain_id_filter.as_string(Domain_id_filter::DO_DOMAIN_IDS);
765   if (do_domain_ids_buf == NULL)
766   {
767     err= 1;                                     /* error */
768     goto done;
769   }
770 
771   ignore_domain_ids_buf=
772     mi->domain_id_filter.as_string(Domain_id_filter::IGNORE_DOMAIN_IDS);
773   if (ignore_domain_ids_buf == NULL)
774   {
775     err= 1;                                     /* error */
776     goto done;
777   }
778 
779   /*
780     We flushed the relay log BEFORE the master.info file, because if we crash
781     now, we will get a duplicate event in the relay log at restart. If we
782     flushed in the other order, we would get a hole in the relay log.
783     And duplicate is better than hole (with a duplicate, in later versions we
784     can add detection and scrap one event; with a hole there's nothing we can
785     do).
786   */
787 
788   /*
789      In certain cases this code may create master.info files that seems
790      corrupted, because of extra lines filled with garbage in the end
791      file (this happens if new contents take less space than previous
792      contents of file). But because of number of lines in the first line
793      of file we don't care about this garbage.
794   */
795   char heartbeat_buf[FLOATING_POINT_BUFFER];
796   my_fcvt(mi->heartbeat_period, 3, heartbeat_buf, NULL);
797   my_b_seek(file, 0L);
798   my_b_printf(file,
799               "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n"
800               "\n\n\n\n\n\n\n\n\n\n\n"
801               "using_gtid=%d\n"
802               "do_domain_ids=%s\n"
803               "ignore_domain_ids=%s\n"
804               "END_MARKER\n",
805               LINES_IN_MASTER_INFO,
806               mi->master_log_name, llstr(mi->master_log_pos, lbuf),
807               mi->host, mi->user,
808               mi->password, mi->port, mi->connect_retry,
809               (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
810               mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert,
811               heartbeat_buf, "", ignore_server_ids_buf,
812               "", 0,
813               mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid,
814               do_domain_ids_buf, ignore_domain_ids_buf);
815   err= flush_io_cache(file);
816   if (sync_masterinfo_period && !err &&
817       ++(mi->sync_counter) >= sync_masterinfo_period)
818   {
819     err= my_sync(mi->fd, MYF(MY_WME));
820     mi->sync_counter= 0;
821   }
822 
823   /* Fix err; flush_io_cache()/my_sync() may return -1 */
824   err= (err != 0) ? 1 : 0;
825 
826 done:
827   my_free(ignore_server_ids_buf);
828   my_free(do_domain_ids_buf);
829   my_free(ignore_domain_ids_buf);
830   DBUG_RETURN(err);
831 }
832 
833 
end_master_info(Master_info * mi)834 void end_master_info(Master_info* mi)
835 {
836   DBUG_ENTER("end_master_info");
837 
838   if (!mi->inited)
839     DBUG_VOID_RETURN;
840   if (mi->fd >= 0)
841   {
842     end_io_cache(&mi->file);
843     mysql_file_close(mi->fd, MYF(MY_WME));
844     mi->fd = -1;
845   }
846   mi->inited = 0;
847 
848   DBUG_VOID_RETURN;
849 }
850 
851 /* Multi-Master By P.Linux */
get_key_master_info(Master_info * mi,size_t * length,my_bool not_used)852 uchar *get_key_master_info(Master_info *mi, size_t *length,
853                            my_bool not_used __attribute__((unused)))
854 {
855   /* Return lower case name */
856   *length= mi->cmp_connection_name.length;
857   return (uchar*) mi->cmp_connection_name.str;
858 }
859 
860 /*
861   Delete a master info
862 
863   Called from my_hash_delete(&master_info_hash)
864   Stops associated slave threads and frees master_info
865 */
866 
free_key_master_info(Master_info * mi)867 void free_key_master_info(Master_info *mi)
868 {
869   DBUG_ENTER("free_key_master_info");
870   mysql_mutex_unlock(&LOCK_active_mi);
871 
872   /* Ensure that we are not in reset_slave while this is done */
873   mi->lock_slave_threads();
874   terminate_slave_threads(mi,SLAVE_FORCE_ALL);
875   /* We use 2 here instead of 1 just to make it easier when debugging */
876   mi->killed= 2;
877   end_master_info(mi);
878   end_relay_log_info(&mi->rli);
879   mi->unlock_slave_threads();
880   delete mi;
881 
882   mysql_mutex_lock(&LOCK_active_mi);
883   DBUG_VOID_RETURN;
884 }
885 
886 /**
887    Check if connection name for master_info is valid.
888 
889    It's valid if it's a valid system name of length less than
890    MAX_CONNECTION_NAME.
891 
892    @return
893    0 ok
894    1 error
895 */
896 
check_master_connection_name(LEX_CSTRING * name)897 bool check_master_connection_name(LEX_CSTRING *name)
898 {
899   if (name->length >= MAX_CONNECTION_NAME)
900     return 1;
901   return 0;
902 }
903 
904 
905 /**
906    Create a log file with a given suffix.
907 
908    @param
909    res_file_name	Store result here
910    length		Length of res_file_name buffer
911    info_file		Original file name (prefix)
912    append		1 if we should add suffix last (not before ext)
913    suffix		Suffix
914 
915    @note
916    The suffix is added before the extension of the file name prefixed with '-'.
917    The suffix is also converted to lower case and we transform
918    all not safe character, as we do with MySQL table names.
919 
920    If suffix is an empty string, then we don't add any suffix.
921    This is to allow one to use this function also to generate old
922    file names without a prefix.
923 */
924 
create_logfile_name_with_suffix(char * res_file_name,size_t length,const char * info_file,bool append,LEX_CSTRING * suffix)925 void create_logfile_name_with_suffix(char *res_file_name, size_t length,
926                                      const char *info_file, bool append,
927                                      LEX_CSTRING *suffix)
928 {
929   char buff[MAX_CONNECTION_NAME+1],
930     res[MAX_CONNECTION_NAME * MAX_FILENAME_MBWIDTH+1], *p;
931 
932   p= strmake(res_file_name, info_file, length);
933   /* If not empty suffix and there is place left for some part of the suffix */
934   if (suffix->length != 0 && p <= res_file_name + length -1)
935   {
936     const char *info_file_end= info_file + (p - res_file_name);
937     const char *ext= append ? info_file_end : fn_ext2(info_file);
938     size_t res_length, ext_pos, from_length;
939     uint errors;
940 
941     /* Create null terminated string */
942     from_length= strmake(buff, suffix->str, suffix->length) - buff;
943     /* Convert to characters usable in a file name */
944     res_length= strconvert(system_charset_info, buff, from_length,
945                            &my_charset_filename, res, sizeof(res), &errors);
946 
947     ext_pos= (size_t) (ext - info_file);
948     length-= (suffix->length - ext_pos); /* Leave place for extension */
949     p= res_file_name + ext_pos;
950     *p++= '-';                           /* Add separator */
951     p= strmake(p, res, MY_MIN((size_t) (length - (p - res_file_name)),
952                            res_length));
953     /* Add back extension. We have checked above that there is space for it */
954     strmov(p, ext);
955   }
956 }
957 
copy_filter_setting(Rpl_filter * dst_filter,Rpl_filter * src_filter)958 void copy_filter_setting(Rpl_filter* dst_filter, Rpl_filter* src_filter)
959 {
960   char buf[256];
961   String tmp(buf, sizeof(buf), &my_charset_bin);
962 
963   dst_filter->get_do_db(&tmp);
964   if (tmp.is_empty())
965   {
966     src_filter->get_do_db(&tmp);
967     if (!tmp.is_empty())
968       dst_filter->set_do_db(tmp.ptr());
969   }
970 
971   dst_filter->get_do_table(&tmp);
972   if (tmp.is_empty())
973   {
974     src_filter->get_do_table(&tmp);
975     if (!tmp.is_empty())
976       dst_filter->set_do_table(tmp.ptr());
977   }
978 
979   dst_filter->get_ignore_db(&tmp);
980   if (tmp.is_empty())
981   {
982     src_filter->get_ignore_db(&tmp);
983     if (!tmp.is_empty())
984       dst_filter->set_ignore_db(tmp.ptr());
985   }
986 
987   dst_filter->get_ignore_table(&tmp);
988   if (tmp.is_empty())
989   {
990     src_filter->get_ignore_table(&tmp);
991     if (!tmp.is_empty())
992       dst_filter->set_ignore_table(tmp.ptr());
993   }
994 
995   dst_filter->get_wild_do_table(&tmp);
996   if (tmp.is_empty())
997   {
998     src_filter->get_wild_do_table(&tmp);
999     if (!tmp.is_empty())
1000       dst_filter->set_wild_do_table(tmp.ptr());
1001   }
1002 
1003   dst_filter->get_wild_ignore_table(&tmp);
1004   if (tmp.is_empty())
1005   {
1006     src_filter->get_wild_ignore_table(&tmp);
1007     if (!tmp.is_empty())
1008       dst_filter->set_wild_ignore_table(tmp.ptr());
1009   }
1010 
1011   if (dst_filter->rewrite_db_is_empty())
1012   {
1013     if (!src_filter->rewrite_db_is_empty())
1014       dst_filter->copy_rewrite_db(src_filter);
1015   }
1016 }
1017 
Master_info_index()1018 Master_info_index::Master_info_index()
1019 {
1020   size_t filename_length, dir_length;
1021   /*
1022     Create the Master_info index file by prepending 'multi-' before
1023     the master_info_file file name.
1024   */
1025   fn_format(index_file_name, master_info_file, mysql_data_home,
1026             "", MY_UNPACK_FILENAME);
1027   filename_length= strlen(index_file_name) + 1; /* Count 0 byte */
1028   dir_length= dirname_length(index_file_name);
1029   bmove_upp((uchar*) index_file_name + filename_length + 6,
1030             (uchar*) index_file_name + filename_length,
1031             filename_length - dir_length);
1032   memcpy(index_file_name + dir_length, "multi-", 6);
1033 
1034   bzero((char*) &index_file, sizeof(index_file));
1035   index_file.file= -1;
1036 }
1037 
1038 
1039 /**
1040    Free all connection threads
1041 
1042    This is done during early stages of shutdown
1043    to give connection threads and slave threads time
1044    to die before ~Master_info_index is called
1045 */
1046 
free_connections()1047 void Master_info_index::free_connections()
1048 {
1049   mysql_mutex_assert_owner(&LOCK_active_mi);
1050   my_hash_reset(&master_info_hash);
1051 }
1052 
1053 
1054 /**
1055    Free all connection threads and free structures
1056 */
1057 
~Master_info_index()1058 Master_info_index::~Master_info_index()
1059 {
1060   my_hash_free(&master_info_hash);
1061   end_io_cache(&index_file);
1062   if (index_file.file >= 0)
1063     my_close(index_file.file, MYF(MY_WME));
1064 }
1065 
1066 
1067 /* Load All Master_info from master.info.index File
1068  * RETURN:
1069  *   0 - All Success
1070  *   1 - All Fail
1071  *   2 - Some Success, Some Fail
1072  */
1073 
init_all_master_info()1074 bool Master_info_index::init_all_master_info()
1075 {
1076   int thread_mask;
1077   int err_num= 0, succ_num= 0; // The number of success read Master_info
1078   char sign[MAX_CONNECTION_NAME+1];
1079   File index_file_nr;
1080   THD *thd;
1081   DBUG_ENTER("init_all_master_info");
1082 
1083   DBUG_ASSERT(master_info_index);
1084 
1085   if ((index_file_nr= my_open(index_file_name,
1086                               O_RDWR | O_CREAT | O_BINARY ,
1087                               MYF(MY_WME | ME_NOREFRESH))) < 0 ||
1088       my_sync(index_file_nr, MYF(MY_WME)) ||
1089       init_io_cache(&index_file, index_file_nr,
1090                     IO_SIZE, READ_CACHE,
1091                     my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
1092                     0, MYF(MY_WME | MY_WAIT_IF_FULL)))
1093   {
1094     if (index_file_nr >= 0)
1095       my_close(index_file_nr,MYF(0));
1096 
1097     sql_print_error("Creation of Master_info index file '%s' failed",
1098                     index_file_name);
1099     DBUG_RETURN(1);
1100   }
1101 
1102   /* Initialize Master_info Hash Table */
1103   if (my_hash_init(&master_info_hash, system_charset_info,
1104                    MAX_REPLICATION_THREAD, 0, 0,
1105                    (my_hash_get_key) get_key_master_info,
1106                    (my_hash_free_key)free_key_master_info, HASH_UNIQUE))
1107   {
1108     sql_print_error("Initializing Master_info hash table failed");
1109     DBUG_RETURN(1);
1110   }
1111 
1112   thd= new THD(next_thread_id());  /* Needed by start_slave_threads */
1113   thd->thread_stack= (char*) &thd;
1114   thd->store_globals();
1115 
1116   reinit_io_cache(&index_file, READ_CACHE, 0L,0,0);
1117   while (!init_strvar_from_file(sign, sizeof(sign),
1118                                 &index_file, NULL))
1119   {
1120     LEX_CSTRING connection_name;
1121     Master_info *mi;
1122     char buf_master_info_file[FN_REFLEN];
1123     char buf_relay_log_info_file[FN_REFLEN];
1124 
1125     connection_name.str=    sign;
1126     connection_name.length= strlen(sign);
1127     if (!(mi= new Master_info(&connection_name, relay_log_recovery)) ||
1128         mi->error())
1129     {
1130       delete mi;
1131       goto error;
1132     }
1133 
1134     init_thread_mask(&thread_mask,mi,0 /*not inverse*/);
1135 
1136     create_logfile_name_with_suffix(buf_master_info_file,
1137                                     sizeof(buf_master_info_file),
1138                                     master_info_file, 0,
1139                                     &mi->cmp_connection_name);
1140     create_logfile_name_with_suffix(buf_relay_log_info_file,
1141                                     sizeof(buf_relay_log_info_file),
1142                                     relay_log_info_file, 0,
1143                                     &mi->cmp_connection_name);
1144     if (global_system_variables.log_warnings > 1)
1145       sql_print_information("Reading Master_info: '%s'  Relay_info:'%s'",
1146                             buf_master_info_file, buf_relay_log_info_file);
1147 
1148     mi->lock_slave_threads();
1149     if (init_master_info(mi, buf_master_info_file, buf_relay_log_info_file,
1150                          0, thread_mask))
1151     {
1152       err_num++;
1153       sql_print_error("Initialized Master_info from '%s' failed",
1154                       buf_master_info_file);
1155       if (!master_info_index->get_master_info(&connection_name,
1156                                               Sql_condition::WARN_LEVEL_NOTE))
1157       {
1158         /* Master_info is not in HASH; Add it */
1159         if (master_info_index->add_master_info(mi, FALSE))
1160           goto error;
1161         succ_num++;
1162         mi->unlock_slave_threads();
1163       }
1164       else
1165       {
1166         /* Master_info already in HASH */
1167         sql_print_error(ER_THD_OR_DEFAULT(current_thd,
1168                                           ER_CONNECTION_ALREADY_EXISTS),
1169                         (int) connection_name.length, connection_name.str,
1170                         (int) connection_name.length, connection_name.str);
1171         mi->unlock_slave_threads();
1172         delete mi;
1173       }
1174       continue;
1175     }
1176     else
1177     {
1178       /* Initialization of Master_info succeeded. Add it to HASH */
1179       if (global_system_variables.log_warnings > 1)
1180         sql_print_information("Initialized Master_info from '%s'",
1181                               buf_master_info_file);
1182       if (master_info_index->get_master_info(&connection_name,
1183                                              Sql_condition::WARN_LEVEL_NOTE))
1184       {
1185         /* Master_info was already registered */
1186         sql_print_error(ER_THD_OR_DEFAULT(current_thd,
1187                                           ER_CONNECTION_ALREADY_EXISTS),
1188                         (int) connection_name.length, connection_name.str,
1189                         (int) connection_name.length, connection_name.str);
1190         mi->unlock_slave_threads();
1191         delete mi;
1192         continue;
1193       }
1194 
1195       /* Master_info was not registered; add it */
1196       if (master_info_index->add_master_info(mi, FALSE))
1197         goto error;
1198       succ_num++;
1199 
1200       if (!opt_skip_slave_start)
1201       {
1202         if (start_slave_threads(current_thd,
1203                                 1 /* need mutex */,
1204                                 1 /* wait for start*/,
1205                                 mi,
1206                                 buf_master_info_file,
1207                                 buf_relay_log_info_file,
1208                                 SLAVE_IO | SLAVE_SQL))
1209         {
1210           sql_print_error("Failed to create slave threads for connection '%.*s'",
1211                           (int) connection_name.length,
1212                           connection_name.str);
1213           continue;
1214         }
1215         if (global_system_variables.log_warnings)
1216           sql_print_information("Started replication for '%.*s'",
1217                                 (int) connection_name.length,
1218                                 connection_name.str);
1219       }
1220       mi->unlock_slave_threads();
1221     }
1222   }
1223   thd->reset_globals();
1224   delete thd;
1225 
1226   if (!err_num) // No Error on read Master_info
1227   {
1228     if (global_system_variables.log_warnings > 1)
1229       sql_print_information("Reading of all Master_info entries succeeded");
1230     DBUG_RETURN(0);
1231   }
1232   if (succ_num) // Have some Error and some Success
1233   {
1234     sql_print_warning("Reading of some Master_info entries failed");
1235     DBUG_RETURN(1);
1236   }
1237 
1238   sql_print_error("Reading of all Master_info entries failed!");
1239   DBUG_RETURN(1);
1240 
1241 error:
1242   thd->reset_globals();
1243   delete thd;
1244   DBUG_RETURN(1);
1245 }
1246 
1247 
1248 /* Write new master.info to master.info.index File */
write_master_name_to_index_file(LEX_CSTRING * name,bool do_sync)1249 bool Master_info_index::write_master_name_to_index_file(LEX_CSTRING *name,
1250                                                         bool do_sync)
1251 {
1252   DBUG_ASSERT(my_b_inited(&index_file) != 0);
1253   DBUG_ENTER("write_master_name_to_index_file");
1254 
1255   /* Don't write default slave to master_info.index */
1256   if (name->length == 0)
1257     DBUG_RETURN(0);
1258 
1259   reinit_io_cache(&index_file, WRITE_CACHE,
1260                   my_b_filelength(&index_file), 0, 0);
1261 
1262   if (my_b_write(&index_file, (uchar*) name->str, name->length) ||
1263       my_b_write(&index_file, (uchar*) "\n", 1) ||
1264       flush_io_cache(&index_file) ||
1265       (do_sync && my_sync(index_file.file, MYF(MY_WME))))
1266   {
1267     sql_print_error("Write of new Master_info for '%.*s' to index file failed",
1268                     (int) name->length, name->str);
1269     DBUG_RETURN(1);
1270   }
1271 
1272   DBUG_RETURN(0);
1273 }
1274 
1275 
1276 /**
1277    Get Master_info for a connection and lock the object from deletion
1278 
1279    @param
1280    connection_name	Connection name
1281    warning		WARN_LEVEL_NOTE -> Don't print anything
1282 			WARN_LEVEL_WARN -> Issue warning if not exists
1283 			WARN_LEVEL_ERROR-> Issue error if not exists
1284 */
1285 
get_master_info(const LEX_CSTRING * connection_name,Sql_condition::enum_warning_level warning)1286 Master_info *get_master_info(const LEX_CSTRING *connection_name,
1287                              Sql_condition::enum_warning_level warning)
1288 {
1289   Master_info *mi;
1290   DBUG_ENTER("get_master_info");
1291 
1292   /* Protect against inserts into hash */
1293   mysql_mutex_lock(&LOCK_active_mi);
1294   /*
1295     The following can only be true during shutdown when slave has been killed
1296     but some other threads are still trying to access slave statistics.
1297   */
1298   if (unlikely(!master_info_index))
1299   {
1300     if (warning != Sql_condition::WARN_LEVEL_NOTE)
1301       my_error(WARN_NO_MASTER_INFO,
1302                MYF(warning == Sql_condition::WARN_LEVEL_WARN ?
1303                    ME_JUST_WARNING : 0),
1304                (int) connection_name->length, connection_name->str);
1305     mysql_mutex_unlock(&LOCK_active_mi);
1306     DBUG_RETURN(0);
1307   }
1308   if ((mi= master_info_index->get_master_info(connection_name, warning)))
1309   {
1310     /*
1311       We have to use sleep_lock here. If we would use LOCK_active_mi
1312       then we would take locks in wrong order in Master_info::release()
1313     */
1314     mysql_mutex_lock(&mi->sleep_lock);
1315     mi->users++;
1316     DBUG_PRINT("info",("users: %d", mi->users));
1317     mysql_mutex_unlock(&mi->sleep_lock);
1318   }
1319   mysql_mutex_unlock(&LOCK_active_mi);
1320   DBUG_RETURN(mi);
1321 }
1322 
1323 
1324 /**
1325    Release master info.
1326    Signals ~Master_info that it's now safe to delete it
1327 */
1328 
release()1329 void Master_info::release()
1330 {
1331   mysql_mutex_lock(&sleep_lock);
1332   if (!--users && killed)
1333   {
1334     /* Signal ~Master_info that it's ok to now free it */
1335     mysql_cond_signal(&sleep_cond);
1336   }
1337   mysql_mutex_unlock(&sleep_lock);
1338 }
1339 
1340 
1341 /**
1342    Get Master_info for a connection
1343 
1344    @param
1345    connection_name	Connection name
1346    warning		WARN_LEVEL_NOTE -> Don't print anything
1347 			WARN_LEVEL_WARN -> Issue warning if not exists
1348 			WARN_LEVEL_ERROR-> Issue error if not exists
1349 */
1350 
1351 Master_info *
get_master_info(const LEX_CSTRING * connection_name,Sql_condition::enum_warning_level warning)1352 Master_info_index::get_master_info(const LEX_CSTRING *connection_name,
1353                                    Sql_condition::enum_warning_level warning)
1354 {
1355   Master_info *mi;
1356   char buff[MAX_CONNECTION_NAME+1], *res;
1357   size_t buff_length;
1358   DBUG_ENTER("get_master_info");
1359   DBUG_PRINT("enter",
1360              ("connection_name: '%.*s'", (int) connection_name->length,
1361               connection_name->str));
1362 
1363   /* Make name lower case for comparison */
1364   res= strmake(buff, connection_name->str, connection_name->length);
1365   my_casedn_str(system_charset_info, buff);
1366   buff_length= (size_t) (res-buff);
1367 
1368   mi= (Master_info*) my_hash_search(&master_info_hash,
1369                                     (uchar*) buff, buff_length);
1370   if (!mi && warning != Sql_condition::WARN_LEVEL_NOTE)
1371   {
1372     my_error(WARN_NO_MASTER_INFO,
1373              MYF(warning == Sql_condition::WARN_LEVEL_WARN ? ME_JUST_WARNING :
1374                  0),
1375              (int) connection_name->length,
1376              connection_name->str);
1377   }
1378   DBUG_RETURN(mi);
1379 }
1380 
1381 
1382 /* Check Master_host & Master_port is duplicated or not */
check_duplicate_master_info(LEX_CSTRING * name_arg,const char * host,uint port)1383 bool Master_info_index::check_duplicate_master_info(LEX_CSTRING *name_arg,
1384                                                     const char *host,
1385                                                     uint port)
1386 {
1387   Master_info *mi;
1388   DBUG_ENTER("check_duplicate_master_info");
1389 
1390   mysql_mutex_assert_owner(&LOCK_active_mi);
1391   DBUG_ASSERT(master_info_index);
1392 
1393   /* Get full host and port name */
1394   if ((mi= master_info_index->get_master_info(name_arg,
1395                                               Sql_condition::WARN_LEVEL_NOTE)))
1396   {
1397     if (!host)
1398       host= mi->host;
1399     if (!port)
1400       port= mi->port;
1401   }
1402   if (!host || !port)
1403     DBUG_RETURN(FALSE);                         // Not comparable yet
1404 
1405   for (uint i= 0; i < master_info_hash.records; ++i)
1406   {
1407     Master_info *tmp_mi;
1408     tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
1409     if (tmp_mi == mi)
1410       continue;                                 // Current connection
1411     if (!strcasecmp(host, tmp_mi->host) && port == tmp_mi->port)
1412     {
1413       my_error(ER_CONNECTION_ALREADY_EXISTS, MYF(0),
1414                (int) name_arg->length,
1415                name_arg->str,
1416                (int) tmp_mi->connection_name.length,
1417                tmp_mi->connection_name.str);
1418       DBUG_RETURN(TRUE);
1419     }
1420   }
1421   DBUG_RETURN(FALSE);
1422 }
1423 
1424 
1425 /* Add a Master_info class to Hash Table */
add_master_info(Master_info * mi,bool write_to_file)1426 bool Master_info_index::add_master_info(Master_info *mi, bool write_to_file)
1427 {
1428   /*
1429     We have to protect against shutdown to ensure we are not calling
1430     my_hash_insert() while my_hash_free() is in progress
1431   */
1432   if (unlikely(shutdown_in_progress) ||
1433       !my_hash_insert(&master_info_hash, (uchar*) mi))
1434   {
1435     if (global_system_variables.log_warnings > 1)
1436       sql_print_information("Added new Master_info '%.*s' to hash table",
1437                             (int) mi->connection_name.length,
1438                             mi->connection_name.str);
1439     if (write_to_file)
1440       return write_master_name_to_index_file(&mi->connection_name, 1);
1441     return FALSE;
1442   }
1443 
1444   /* Impossible error (EOM) ? */
1445   sql_print_error("Adding new entry '%.*s' to master_info failed",
1446                   (int) mi->connection_name.length,
1447                   mi->connection_name.str);
1448   return TRUE;
1449 }
1450 
1451 
1452 /**
1453    Remove a Master_info class From Hash Table
1454 
1455    TODO: Change this to use my_rename() to make the file name creation
1456    atomic
1457 */
1458 
remove_master_info(Master_info * mi)1459 bool Master_info_index::remove_master_info(Master_info *mi)
1460 {
1461   DBUG_ENTER("remove_master_info");
1462   mysql_mutex_assert_owner(&LOCK_active_mi);
1463 
1464   // Delete Master_info and rewrite others to file
1465   if (!my_hash_delete(&master_info_hash, (uchar*) mi))
1466   {
1467     File index_file_nr;
1468 
1469     // Close IO_CACHE and FILE handler fisrt
1470     end_io_cache(&index_file);
1471     my_close(index_file.file, MYF(MY_WME));
1472 
1473     // Reopen File and truncate it
1474     if ((index_file_nr= my_open(index_file_name,
1475                                 O_RDWR | O_CREAT | O_TRUNC | O_BINARY ,
1476                                 MYF(MY_WME))) < 0 ||
1477         init_io_cache(&index_file, index_file_nr,
1478                       IO_SIZE, WRITE_CACHE,
1479                       my_seek(index_file_nr,0L,MY_SEEK_END,MYF(0)),
1480                       0, MYF(MY_WME | MY_WAIT_IF_FULL)))
1481     {
1482       int error= my_errno;
1483       if (index_file_nr >= 0)
1484         my_close(index_file_nr,MYF(0));
1485 
1486       sql_print_error("Create of Master Info Index file '%s' failed with "
1487                       "error: %M",
1488                       index_file_name, error);
1489       DBUG_RETURN(TRUE);
1490     }
1491 
1492     // Rewrite Master_info.index
1493     for (uint i= 0; i< master_info_hash.records; ++i)
1494     {
1495       Master_info *tmp_mi;
1496       tmp_mi= (Master_info *) my_hash_element(&master_info_hash, i);
1497       write_master_name_to_index_file(&tmp_mi->connection_name, 0);
1498     }
1499     if (my_sync(index_file_nr, MYF(MY_WME)))
1500       DBUG_RETURN(TRUE);
1501   }
1502   DBUG_RETURN(FALSE);
1503 }
1504 
1505 
1506 /**
1507    give_error_if_slave_running()
1508 
1509    @param
1510    already_locked  0 if we need to lock, 1 if we have LOCK_active_mi_locked
1511 
1512    @return
1513    TRUE  	If some slave is running.  An error is printed
1514    FALSE	No slave is running
1515 */
1516 
give_error_if_slave_running(bool already_locked)1517 bool give_error_if_slave_running(bool already_locked)
1518 {
1519   bool ret= 0;
1520   DBUG_ENTER("give_error_if_slave_running");
1521 
1522   if (!already_locked)
1523     mysql_mutex_lock(&LOCK_active_mi);
1524   if (!master_info_index)
1525   {
1526     my_error(ER_SERVER_SHUTDOWN, MYF(0));
1527     ret= 1;
1528   }
1529   else
1530   {
1531     HASH *hash= &master_info_index->master_info_hash;
1532     for (uint i= 0; i< hash->records; ++i)
1533     {
1534       Master_info *mi;
1535       mi= (Master_info *) my_hash_element(hash, i);
1536       if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
1537       {
1538         my_error(ER_SLAVE_MUST_STOP, MYF(0), (int) mi->connection_name.length,
1539                  mi->connection_name.str);
1540         ret= 1;
1541         break;
1542       }
1543     }
1544   }
1545   if (!already_locked)
1546     mysql_mutex_unlock(&LOCK_active_mi);
1547   DBUG_RETURN(ret);
1548 }
1549 
1550 
1551 /**
1552    any_slave_sql_running()
1553 
1554    @param
1555    already_locked  0 if we need to lock, 1 if we have LOCK_active_mi_locked
1556 
1557    @return
1558    0            No Slave SQL thread is running
1559    #		Number of slave SQL thread running
1560 
1561    Note that during shutdown we return 1. This is needed to ensure we
1562    don't try to resize thread pool during shutdown as during shutdown
1563    master_info_hash may be freeing the hash and during that time
1564    hash entries can't be accessed.
1565 */
1566 
any_slave_sql_running(bool already_locked)1567 uint any_slave_sql_running(bool already_locked)
1568 {
1569   uint count= 0;
1570   HASH *hash;
1571   DBUG_ENTER("any_slave_sql_running");
1572 
1573   if (!already_locked)
1574     mysql_mutex_lock(&LOCK_active_mi);
1575   if (unlikely(shutdown_in_progress || !master_info_index))
1576     count= 1;
1577   else
1578   {
1579     hash= &master_info_index->master_info_hash;
1580     for (uint i= 0; i< hash->records; ++i)
1581     {
1582       Master_info *mi= (Master_info *)my_hash_element(hash, i);
1583       if (mi->rli.slave_running != MYSQL_SLAVE_NOT_RUN)
1584         count++;
1585     }
1586   }
1587   if (!already_locked)
1588     mysql_mutex_unlock(&LOCK_active_mi);
1589   DBUG_RETURN(count);
1590 }
1591 
1592 
1593 /**
1594    Master_info_index::start_all_slaves()
1595 
1596    Start all slaves that was not running.
1597 
1598    @return
1599    TRUE  	Error
1600    FALSE	Everything ok.
1601 
1602    This code is written so that we don't keep LOCK_active_mi active
1603    while we are starting a slave.
1604 */
1605 
start_all_slaves(THD * thd)1606 bool Master_info_index::start_all_slaves(THD *thd)
1607 {
1608   bool result= FALSE;
1609   DBUG_ENTER("start_all_slaves");
1610   mysql_mutex_assert_owner(&LOCK_active_mi);
1611 
1612   for (uint i= 0; i< master_info_hash.records; i++)
1613   {
1614     Master_info *mi;
1615     mi= (Master_info *) my_hash_element(&master_info_hash, i);
1616     mi->in_start_all_slaves= 0;
1617   }
1618 
1619   for (uint i= 0; i< master_info_hash.records; )
1620   {
1621     int error;
1622     Master_info *mi;
1623     mi= (Master_info *) my_hash_element(&master_info_hash, i);
1624 
1625     /*
1626       Try to start all slaves that are configured (host is defined)
1627       and are not already running
1628     */
1629     if (!((mi->slave_running == MYSQL_SLAVE_NOT_RUN ||
1630            !mi->rli.slave_running) && *mi->host) ||
1631         mi->in_start_all_slaves)
1632     {
1633       i++;
1634       continue;
1635     }
1636     mi->in_start_all_slaves= 1;
1637 
1638     mysql_mutex_lock(&mi->sleep_lock);
1639     mi->users++;                                // Mark used
1640     mysql_mutex_unlock(&mi->sleep_lock);
1641     mysql_mutex_unlock(&LOCK_active_mi);
1642     error= start_slave(thd, mi, 1);
1643     mi->release();
1644     mysql_mutex_lock(&LOCK_active_mi);
1645     if (unlikely(error))
1646     {
1647       my_error(ER_CANT_START_STOP_SLAVE, MYF(0),
1648                "START",
1649                (int) mi->connection_name.length,
1650                mi->connection_name.str);
1651       result= 1;
1652       if (error < 0)                            // fatal error
1653         break;
1654     }
1655     else if (thd)
1656       push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
1657                           ER_SLAVE_STARTED, ER_THD(thd, ER_SLAVE_STARTED),
1658                           (int) mi->connection_name.length,
1659                           mi->connection_name.str);
1660     /* Restart from first element as master_info_hash may have changed */
1661     i= 0;
1662     continue;
1663   }
1664   DBUG_RETURN(result);
1665 }
1666 
1667 
1668 /**
1669    Master_info_index::stop_all_slaves()
1670 
1671    Start all slaves that was not running.
1672 
1673    @param	thread id from user
1674 
1675    @return
1676    TRUE  	Error
1677    FALSE	Everything ok.
1678 
1679    This code is written so that we don't keep LOCK_active_mi active
1680    while we are stopping a slave.
1681 */
1682 
stop_all_slaves(THD * thd)1683 bool Master_info_index::stop_all_slaves(THD *thd)
1684 {
1685   bool result= FALSE;
1686   DBUG_ENTER("stop_all_slaves");
1687   mysql_mutex_assert_owner(&LOCK_active_mi);
1688   DBUG_ASSERT(thd);
1689 
1690   for (uint i= 0; i< master_info_hash.records; i++)
1691   {
1692     Master_info *mi;
1693     mi= (Master_info *) my_hash_element(&master_info_hash, i);
1694     mi->in_stop_all_slaves= 0;
1695   }
1696 
1697   for (uint i= 0; i< master_info_hash.records ;)
1698   {
1699     int error;
1700     Master_info *mi;
1701     mi= (Master_info *) my_hash_element(&master_info_hash, i);
1702     if (!(mi->slave_running != MYSQL_SLAVE_NOT_RUN ||
1703           mi->rli.slave_running) ||
1704         mi->in_stop_all_slaves)
1705     {
1706       i++;
1707       continue;
1708     }
1709     mi->in_stop_all_slaves= 1;                  // Protection for loops
1710 
1711     mysql_mutex_lock(&mi->sleep_lock);
1712     mi->users++;                                // Mark used
1713     mysql_mutex_unlock(&mi->sleep_lock);
1714     mysql_mutex_unlock(&LOCK_active_mi);
1715     error= stop_slave(thd, mi, 1);
1716     mi->release();
1717     mysql_mutex_lock(&LOCK_active_mi);
1718     if (unlikely(error))
1719     {
1720       my_error(ER_CANT_START_STOP_SLAVE, MYF(0),
1721                "STOP",
1722                (int) mi->connection_name.length,
1723                mi->connection_name.str);
1724       result= 1;
1725       if (error < 0)                            // Fatal error
1726         break;
1727     }
1728     else
1729       push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
1730                           ER_SLAVE_STOPPED, ER_THD(thd, ER_SLAVE_STOPPED),
1731                           (int) mi->connection_name.length,
1732                           mi->connection_name.str);
1733     /* Restart from first element as master_info_hash may have changed */
1734     i= 0;
1735     continue;
1736   }
1737   DBUG_RETURN(result);
1738 }
1739 
Domain_id_filter()1740 Domain_id_filter::Domain_id_filter() : m_filter(false)
1741 {
1742   for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++)
1743   {
1744     my_init_dynamic_array(&m_domain_ids[i], sizeof(ulong), 16, 16, MYF(0));
1745   }
1746 }
1747 
~Domain_id_filter()1748 Domain_id_filter::~Domain_id_filter()
1749 {
1750   for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++)
1751   {
1752     delete_dynamic(&m_domain_ids[i]);
1753   }
1754 }
1755 
1756 /**
1757   Update m_filter flag for the current group by looking up its domain id in the
1758   domain ids list. DO_DOMAIN_IDS list is only looked-up is both (do & ignore)
1759   list are non-empty.
1760 */
do_filter(ulong domain_id)1761 void Domain_id_filter::do_filter(ulong domain_id)
1762 {
1763   DYNAMIC_ARRAY *do_domain_ids= &m_domain_ids[DO_DOMAIN_IDS];
1764   DYNAMIC_ARRAY *ignore_domain_ids= &m_domain_ids[IGNORE_DOMAIN_IDS];
1765 
1766   if (do_domain_ids->elements > 0)
1767   {
1768     if (likely(do_domain_ids->elements == 1))
1769       m_filter= ((* (ulong *) dynamic_array_ptr(do_domain_ids, 0))
1770                  != domain_id);
1771     else
1772       m_filter= (bsearch((const ulong *) &domain_id, do_domain_ids->buffer,
1773                          do_domain_ids->elements, sizeof(ulong),
1774                          change_master_id_cmp) == NULL);
1775   }
1776   else if (ignore_domain_ids->elements > 0)
1777   {
1778     if (likely(ignore_domain_ids->elements == 1))
1779       m_filter= ((* (ulong *) dynamic_array_ptr(ignore_domain_ids, 0)) ==
1780                  domain_id);
1781     else
1782       m_filter= (bsearch((const ulong *) &domain_id, ignore_domain_ids->buffer,
1783                          ignore_domain_ids->elements, sizeof(ulong),
1784                          change_master_id_cmp) != NULL);
1785   }
1786   return;
1787 }
1788 
1789 /**
1790   Reset m_filter. It should be called when IO thread receives COMMIT_EVENT or
1791   XID_EVENT.
1792 */
reset_filter()1793 void Domain_id_filter::reset_filter()
1794 {
1795   m_filter= false;
1796 }
1797 
clear_ids()1798 void Domain_id_filter::clear_ids()
1799 {
1800   reset_dynamic(&m_domain_ids[DO_DOMAIN_IDS]);
1801   reset_dynamic(&m_domain_ids[IGNORE_DOMAIN_IDS]);
1802 }
1803 
1804 /**
1805   Update the do/ignore domain id filter lists.
1806 
1807   @param do_ids     [IN]            domain ids to be kept
1808   @param ignore_ids [IN]            domain ids to be filtered out
1809   @param using_gtid [IN]            use GTID?
1810 
1811   @retval false                     Success
1812           true                      Error
1813 */
update_ids(DYNAMIC_ARRAY * do_ids,DYNAMIC_ARRAY * ignore_ids,bool using_gtid)1814 bool Domain_id_filter::update_ids(DYNAMIC_ARRAY *do_ids,
1815                                   DYNAMIC_ARRAY *ignore_ids,
1816                                   bool using_gtid)
1817 {
1818   bool do_list_empty, ignore_list_empty;
1819 
1820   if (do_ids)
1821   {
1822     do_list_empty= (do_ids->elements > 0) ? false : true;
1823   } else {
1824     do_list_empty= (m_domain_ids[DO_DOMAIN_IDS].elements > 0) ? false : true;
1825   }
1826 
1827   if (ignore_ids)
1828   {
1829     ignore_list_empty= (ignore_ids->elements > 0) ? false : true;
1830   } else {
1831     ignore_list_empty= (m_domain_ids[IGNORE_DOMAIN_IDS].elements > 0) ? false :
1832       true;
1833   }
1834 
1835   if (!do_list_empty && !ignore_list_empty)
1836   {
1837     sql_print_error("Both DO_DOMAIN_IDS & IGNORE_DOMAIN_IDS lists can't be "
1838                     "non-empty at the same time");
1839     return true;
1840   }
1841 
1842   if (using_gtid == Master_info::USE_GTID_NO &&
1843       (!do_list_empty || !ignore_list_empty))
1844   {
1845     sql_print_error("DO_DOMAIN_IDS or IGNORE_DOMAIN_IDS lists can't be "
1846                     "non-empty in non-GTID mode (MASTER_USE_GTID=no)");
1847     return true;
1848   }
1849 
1850   if (do_ids)
1851     update_change_master_ids(do_ids, &m_domain_ids[DO_DOMAIN_IDS]);
1852 
1853   if (ignore_ids)
1854     update_change_master_ids(ignore_ids, &m_domain_ids[IGNORE_DOMAIN_IDS]);
1855 
1856   m_filter= false;
1857 
1858   return false;
1859 }
1860 
1861 /**
1862   Serialize and store the ids from domain id lists into the thd's protocol
1863   buffer.
1864 
1865   @param thd [IN]                   thread handler
1866 
1867   @retval void
1868 */
store_ids(THD * thd)1869 void Domain_id_filter::store_ids(THD *thd)
1870 {
1871   for (int i= DO_DOMAIN_IDS; i <= IGNORE_DOMAIN_IDS; i ++)
1872   {
1873     prot_store_ids(thd, &m_domain_ids[i]);
1874   }
1875 }
1876 
1877 /**
1878   Initialize the given domain_id list (DYNAMIC_ARRAY) with the
1879   space-separated list of numbers from the specified IO_CACHE where
1880   the first number represents the total number of entries to follows.
1881 
1882   @param f    [IN]                  IO_CACHE file
1883   @param type [IN]                  domain id list type
1884 
1885   @retval false                     Success
1886           true                      Error
1887 */
init_ids(IO_CACHE * f,enum_list_type type)1888 bool Domain_id_filter::init_ids(IO_CACHE *f, enum_list_type type)
1889 {
1890   return init_dynarray_intvar_from_file(&m_domain_ids[type], f);
1891 }
1892 
1893 /**
1894   Return the elements of the give domain id list type as string.
1895 
1896   @param type [IN]                  domain id list type
1897 
1898   @retval                           a string buffer storing the total number
1899                                     of elements followed by the individual
1900                                     elements (space-separated) in the
1901                                     specified list.
1902 
1903   Note: Its caller's responsibility to free the returned string buffer.
1904 */
as_string(enum_list_type type)1905 char *Domain_id_filter::as_string(enum_list_type type)
1906 {
1907   char *buf;
1908   size_t sz;
1909   DYNAMIC_ARRAY *ids= &m_domain_ids[type];
1910 
1911   sz= (sizeof(ulong) * 3 + 1) * (1 + ids->elements);
1912 
1913   if (!(buf= (char *) my_malloc(sz, MYF(MY_WME))))
1914     return NULL;
1915 
1916   // Store the total number of elements followed by the individual elements.
1917   size_t cur_len= sprintf(buf, "%u", ids->elements);
1918   sz-= cur_len;
1919 
1920   for (uint i= 0; i < ids->elements; i++)
1921   {
1922     ulong domain_id;
1923     get_dynamic(ids, (void *) &domain_id, i);
1924     cur_len+= my_snprintf(buf + cur_len, sz, " %lu", domain_id);
1925     sz-= cur_len;
1926   }
1927   return buf;
1928 }
1929 
update_change_master_ids(DYNAMIC_ARRAY * new_ids,DYNAMIC_ARRAY * old_ids)1930 void update_change_master_ids(DYNAMIC_ARRAY *new_ids, DYNAMIC_ARRAY *old_ids)
1931 {
1932   reset_dynamic(old_ids);
1933 
1934   /* bsearch requires an ordered list. */
1935   sort_dynamic(new_ids, change_master_id_cmp);
1936 
1937   for (uint i= 0; i < new_ids->elements; i++)
1938   {
1939     ulong id;
1940     get_dynamic(new_ids, (void *) &id, i);
1941 
1942     if (bsearch((const ulong *) &id, old_ids->buffer, old_ids->elements,
1943                 sizeof(ulong), change_master_id_cmp) == NULL)
1944     {
1945       insert_dynamic(old_ids, (ulong *) &id);
1946     }
1947   }
1948   return;
1949 }
1950 
1951 /**
1952   Serialize and store the ids from the given ids DYNAMIC_ARRAY into the thd's
1953   protocol buffer.
1954 
1955   @param thd [IN]                   thread handler
1956   @param ids [IN]                   ids list
1957 
1958   @retval void
1959 */
1960 
prot_store_ids(THD * thd,DYNAMIC_ARRAY * ids)1961 void prot_store_ids(THD *thd, DYNAMIC_ARRAY *ids)
1962 {
1963   char buff[FN_REFLEN];
1964   uint i, cur_len;
1965 
1966   for (i= 0, buff[0]= 0, cur_len= 0; i < ids->elements; i++)
1967   {
1968     ulong id, len;
1969     char dbuff[FN_REFLEN];
1970     get_dynamic(ids, (void *) &id, i);
1971     len= sprintf(dbuff, (i == 0 ? "%lu" : ", %lu"), id);
1972     if (cur_len + len + 4 > FN_REFLEN)
1973     {
1974       /*
1975         break the loop whenever remained space could not fit
1976         ellipses on the next cycle
1977       */
1978       sprintf(dbuff + cur_len, "...");
1979       break;
1980     }
1981     cur_len += sprintf(buff + cur_len, "%s", dbuff);
1982   }
1983   thd->protocol->store(buff, &my_charset_bin);
1984   return;
1985 }
1986 
flush_all_relay_logs()1987 bool Master_info_index::flush_all_relay_logs()
1988 {
1989   DBUG_ENTER("flush_all_relay_logs");
1990   bool result= false;
1991   int error= 0;
1992   mysql_mutex_lock(&LOCK_active_mi);
1993   for (uint i= 0; i< master_info_hash.records; i++)
1994   {
1995     Master_info *mi;
1996     mi= (Master_info *) my_hash_element(&master_info_hash, i);
1997     mi->in_flush_all_relay_logs= 0;
1998   }
1999   for (uint i=0; i < master_info_hash.records;)
2000   {
2001     Master_info *mi;
2002     mi= (Master_info *)my_hash_element(&master_info_hash, i);
2003     DBUG_ASSERT(mi);
2004 
2005     if (mi->in_flush_all_relay_logs)
2006     {
2007       i++;
2008       continue;
2009     }
2010     mi->in_flush_all_relay_logs= 1;
2011 
2012     mysql_mutex_lock(&mi->sleep_lock);
2013     mi->users++;                                // Mark used
2014     mysql_mutex_unlock(&mi->sleep_lock);
2015     mysql_mutex_unlock(&LOCK_active_mi);
2016 
2017     mysql_mutex_lock(&mi->data_lock);
2018     error= rotate_relay_log(mi);
2019     mysql_mutex_unlock(&mi->data_lock);
2020     mi->release();
2021     mysql_mutex_lock(&LOCK_active_mi);
2022 
2023     if (unlikely(error))
2024     {
2025       result= true;
2026       break;
2027     }
2028     /* Restart from first element as master_info_hash may have changed */
2029     i= 0;
2030     continue;
2031   }
2032   mysql_mutex_unlock(&LOCK_active_mi);
2033   DBUG_RETURN(result);
2034 }
2035 
2036 #endif /* HAVE_REPLICATION */
2037