1 /* Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "rpl_msr.h"
24 
25 #include "rpl_rli.h"     // Relay_log_info
26 
27 const char* Multisource_info::default_channel= "";
28 const char* Multisource_info::group_replication_channel_names[] = {
29   "group_replication_applier",
30   "group_replication_recovery"
31 };
32 
add_mi(const char * channel_name,Master_info * mi)33 bool Multisource_info::add_mi(const char* channel_name, Master_info* mi)
34 {
35   DBUG_ENTER("Multisource_info::add_mi");
36 
37   m_channel_map_lock->assert_some_wrlock();
38 
39   mi_map::const_iterator it;
40   std::pair<mi_map::iterator, bool>  ret;
41   bool res= false;
42 
43   /* The check of mi exceeding MAX_CHANNELS shall be done in the caller */
44   DBUG_ASSERT(current_mi_count < MAX_CHANNELS);
45 
46   replication_channel_map::iterator map_it;
47   enum_channel_type type= is_group_replication_channel_name(channel_name)
48     ? GROUP_REPLICATION_CHANNEL: SLAVE_REPLICATION_CHANNEL;
49 
50   map_it= rep_channel_map.find(type);
51 
52   if (map_it == rep_channel_map.end())
53   {
54     std::pair<replication_channel_map::iterator, bool> map_ret =
55       rep_channel_map.insert(replication_channel_map::value_type(type, mi_map()));
56 
57     if (!map_ret.second)
58       DBUG_RETURN(true);
59 
60     map_it = rep_channel_map.find(type);
61   }
62 
63   ret = map_it->second.insert(mi_map::value_type(channel_name, mi));
64 
65   /* If a map insert fails, ret.second is false */
66   if(!ret.second)
67     DBUG_RETURN(true);
68 
69   /* Save the pointer for the default_channel to avoid searching it */
70   if (!strcmp(channel_name, get_default_channel()))
71     default_channel_mi= mi;
72 
73 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
74   res= add_mi_to_rpl_pfs_mi(mi);
75 #endif
76   current_mi_count++;
77 
78   DBUG_RETURN(res);
79 
80 }
81 
get_mi(const char * channel_name)82 Master_info* Multisource_info::get_mi(const char* channel_name)
83 {
84   DBUG_ENTER("Multisource_info::get_mi");
85 
86   m_channel_map_lock->assert_some_lock();
87 
88   DBUG_ASSERT(channel_name != 0);
89 
90   mi_map::iterator it;
91   replication_channel_map::iterator map_it;
92 
93   map_it= rep_channel_map.find(SLAVE_REPLICATION_CHANNEL);
94   if (map_it != rep_channel_map.end())
95   {
96     it= map_it->second.find(channel_name);
97   }
98 
99   if (map_it == rep_channel_map.end() || //If not a slave channel, maybe a group one
100       it == map_it->second.end())
101   {
102     map_it= rep_channel_map.find(GROUP_REPLICATION_CHANNEL);
103     if (map_it == rep_channel_map.end())
104     {
105       DBUG_RETURN(0);
106     }
107     it= map_it->second.find(channel_name);
108     if (it == map_it->second.end())
109     {
110       DBUG_RETURN(0);
111     }
112   }
113 
114   DBUG_RETURN(it->second);
115 }
116 
delete_mi(const char * channel_name)117 void Multisource_info::delete_mi(const char* channel_name)
118 {
119   DBUG_ENTER("Multisource_info::delete_mi");
120 
121   m_channel_map_lock->assert_some_wrlock();
122 
123   Master_info *mi= 0;
124   mi_map::iterator it;
125 
126   DBUG_ASSERT(channel_name != 0);
127 
128   replication_channel_map::iterator map_it;
129   map_it= rep_channel_map.find(SLAVE_REPLICATION_CHANNEL);
130 
131   if (map_it != rep_channel_map.end())
132   {
133     it= map_it->second.find(channel_name);
134   }
135   if (map_it == rep_channel_map.end() || //If not a slave channel, maybe a group one
136       it == map_it->second.end())
137   {
138     map_it= rep_channel_map.find(GROUP_REPLICATION_CHANNEL);
139     DBUG_ASSERT(map_it != rep_channel_map.end());
140 
141     it= map_it->second.find(channel_name);
142     DBUG_ASSERT(it != map_it->second.end());
143   }
144 
145 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
146   int index= -1;
147   /* get the index of mi from rpl_pfs_mi */
148   index= get_index_from_rpl_pfs_mi(channel_name);
149 
150   DBUG_ASSERT(index != -1);
151 
152   /* set the current index to  0  and decrease current_mi_count */
153   rpl_pfs_mi[index] = 0;
154 #endif
155 
156   current_mi_count--;
157 
158   mi= it->second;
159   it->second= 0;
160   /* erase from the map */
161   map_it->second.erase(it);
162 
163   if (default_channel_mi == mi)
164     default_channel_mi= NULL;
165 
166   /* delete the master info */
167   if (mi)
168   {
169     mi->channel_assert_some_wrlock();
170     mi->wait_until_no_reference(current_thd);
171 
172     if(mi->rli)
173     {
174       delete mi->rli;
175     }
176     delete mi;
177   }
178 
179   DBUG_VOID_RETURN;
180 }
181 
182 
is_group_replication_channel_name(const char * channel,bool is_applier)183 bool Multisource_info::is_group_replication_channel_name(const char* channel,
184                                                          bool is_applier)
185 {
186   if (is_applier)
187     return !strcmp(channel, group_replication_channel_names[0]);
188   else
189     return !strcmp(channel, group_replication_channel_names[0]) ||
190            !strcmp(channel, group_replication_channel_names[1]);
191 }
192 
193 
194 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
195 
add_mi_to_rpl_pfs_mi(Master_info * mi)196 bool Multisource_info::add_mi_to_rpl_pfs_mi(Master_info *mi)
197 {
198   DBUG_ENTER("Multisource_info::add_mi_to_rpl_pfs_mi");
199 
200   m_channel_map_lock->assert_some_wrlock();
201 
202   bool res=true; // not added
203 
204   /* Point to this added mi in the rpl_pfs_mi*/
205   for (uint i = 0; i < MAX_CHANNELS; i++)
206   {
207     if (rpl_pfs_mi[i] == 0)
208     {
209       rpl_pfs_mi[i] = mi;
210       res= false;  // success
211       break;
212     }
213   }
214   DBUG_RETURN(res);
215 }
216 
217 
get_index_from_rpl_pfs_mi(const char * channel_name)218 int Multisource_info::get_index_from_rpl_pfs_mi(const char * channel_name)
219 {
220   m_channel_map_lock->assert_some_lock();
221 
222   Master_info* mi= 0;
223   for (uint i= 0; i < MAX_CHANNELS; i++)
224   {
225     mi= rpl_pfs_mi[i];
226     if (mi)
227     {
228       if ( !strcmp(mi->get_channel(), channel_name))
229         return i;
230     }
231   }
232   return -1;
233 }
234 
235 
get_mi_at_pos(uint pos)236 Master_info*  Multisource_info::get_mi_at_pos(uint pos)
237 {
238   DBUG_ENTER("Multisource_info::get_mi_at_pos");
239 
240   m_channel_map_lock->assert_some_lock();
241 
242   if ( pos < MAX_CHANNELS)
243     DBUG_RETURN(rpl_pfs_mi[pos]);
244 
245   DBUG_RETURN(0);
246 }
247 #endif /*WITH_PERFSCHEMA_STORAGE_ENGINE */
248 
249 /* There is only one channel_map for the whole server */
250 Multisource_info channel_map;
251