1 /* Copyright (c) 2014, 2017, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "rpl_msr.h"
24
25 #include "rpl_rli.h" // Relay_log_info
26
27 const char* Multisource_info::default_channel= "";
28 const char* Multisource_info::group_replication_channel_names[] = {
29 "group_replication_applier",
30 "group_replication_recovery"
31 };
32
add_mi(const char * channel_name,Master_info * mi)33 bool Multisource_info::add_mi(const char* channel_name, Master_info* mi)
34 {
35 DBUG_ENTER("Multisource_info::add_mi");
36
37 m_channel_map_lock->assert_some_wrlock();
38
39 mi_map::const_iterator it;
40 std::pair<mi_map::iterator, bool> ret;
41 bool res= false;
42
43 /* The check of mi exceeding MAX_CHANNELS shall be done in the caller */
44 DBUG_ASSERT(current_mi_count < MAX_CHANNELS);
45
46 replication_channel_map::iterator map_it;
47 enum_channel_type type= is_group_replication_channel_name(channel_name)
48 ? GROUP_REPLICATION_CHANNEL: SLAVE_REPLICATION_CHANNEL;
49
50 map_it= rep_channel_map.find(type);
51
52 if (map_it == rep_channel_map.end())
53 {
54 std::pair<replication_channel_map::iterator, bool> map_ret =
55 rep_channel_map.insert(replication_channel_map::value_type(type, mi_map()));
56
57 if (!map_ret.second)
58 DBUG_RETURN(true);
59
60 map_it = rep_channel_map.find(type);
61 }
62
63 ret = map_it->second.insert(mi_map::value_type(channel_name, mi));
64
65 /* If a map insert fails, ret.second is false */
66 if(!ret.second)
67 DBUG_RETURN(true);
68
69 /* Save the pointer for the default_channel to avoid searching it */
70 if (!strcmp(channel_name, get_default_channel()))
71 default_channel_mi= mi;
72
73 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
74 res= add_mi_to_rpl_pfs_mi(mi);
75 #endif
76 current_mi_count++;
77
78 DBUG_RETURN(res);
79
80 }
81
get_mi(const char * channel_name)82 Master_info* Multisource_info::get_mi(const char* channel_name)
83 {
84 DBUG_ENTER("Multisource_info::get_mi");
85
86 m_channel_map_lock->assert_some_lock();
87
88 DBUG_ASSERT(channel_name != 0);
89
90 mi_map::iterator it;
91 replication_channel_map::iterator map_it;
92
93 map_it= rep_channel_map.find(SLAVE_REPLICATION_CHANNEL);
94 if (map_it != rep_channel_map.end())
95 {
96 it= map_it->second.find(channel_name);
97 }
98
99 if (map_it == rep_channel_map.end() || //If not a slave channel, maybe a group one
100 it == map_it->second.end())
101 {
102 map_it= rep_channel_map.find(GROUP_REPLICATION_CHANNEL);
103 if (map_it == rep_channel_map.end())
104 {
105 DBUG_RETURN(0);
106 }
107 it= map_it->second.find(channel_name);
108 if (it == map_it->second.end())
109 {
110 DBUG_RETURN(0);
111 }
112 }
113
114 DBUG_RETURN(it->second);
115 }
116
delete_mi(const char * channel_name)117 void Multisource_info::delete_mi(const char* channel_name)
118 {
119 DBUG_ENTER("Multisource_info::delete_mi");
120
121 m_channel_map_lock->assert_some_wrlock();
122
123 Master_info *mi= 0;
124 mi_map::iterator it;
125
126 DBUG_ASSERT(channel_name != 0);
127
128 replication_channel_map::iterator map_it;
129 map_it= rep_channel_map.find(SLAVE_REPLICATION_CHANNEL);
130
131 if (map_it != rep_channel_map.end())
132 {
133 it= map_it->second.find(channel_name);
134 }
135 if (map_it == rep_channel_map.end() || //If not a slave channel, maybe a group one
136 it == map_it->second.end())
137 {
138 map_it= rep_channel_map.find(GROUP_REPLICATION_CHANNEL);
139 DBUG_ASSERT(map_it != rep_channel_map.end());
140
141 it= map_it->second.find(channel_name);
142 DBUG_ASSERT(it != map_it->second.end());
143 }
144
145 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
146 int index= -1;
147 /* get the index of mi from rpl_pfs_mi */
148 index= get_index_from_rpl_pfs_mi(channel_name);
149
150 DBUG_ASSERT(index != -1);
151
152 /* set the current index to 0 and decrease current_mi_count */
153 rpl_pfs_mi[index] = 0;
154 #endif
155
156 current_mi_count--;
157
158 mi= it->second;
159 it->second= 0;
160 /* erase from the map */
161 map_it->second.erase(it);
162
163 if (default_channel_mi == mi)
164 default_channel_mi= NULL;
165
166 /* delete the master info */
167 if (mi)
168 {
169 mi->channel_assert_some_wrlock();
170 mi->wait_until_no_reference(current_thd);
171
172 if(mi->rli)
173 {
174 delete mi->rli;
175 }
176 delete mi;
177 }
178
179 DBUG_VOID_RETURN;
180 }
181
182
is_group_replication_channel_name(const char * channel,bool is_applier)183 bool Multisource_info::is_group_replication_channel_name(const char* channel,
184 bool is_applier)
185 {
186 if (is_applier)
187 return !strcmp(channel, group_replication_channel_names[0]);
188 else
189 return !strcmp(channel, group_replication_channel_names[0]) ||
190 !strcmp(channel, group_replication_channel_names[1]);
191 }
192
193
194 #ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
195
add_mi_to_rpl_pfs_mi(Master_info * mi)196 bool Multisource_info::add_mi_to_rpl_pfs_mi(Master_info *mi)
197 {
198 DBUG_ENTER("Multisource_info::add_mi_to_rpl_pfs_mi");
199
200 m_channel_map_lock->assert_some_wrlock();
201
202 bool res=true; // not added
203
204 /* Point to this added mi in the rpl_pfs_mi*/
205 for (uint i = 0; i < MAX_CHANNELS; i++)
206 {
207 if (rpl_pfs_mi[i] == 0)
208 {
209 rpl_pfs_mi[i] = mi;
210 res= false; // success
211 break;
212 }
213 }
214 DBUG_RETURN(res);
215 }
216
217
get_index_from_rpl_pfs_mi(const char * channel_name)218 int Multisource_info::get_index_from_rpl_pfs_mi(const char * channel_name)
219 {
220 m_channel_map_lock->assert_some_lock();
221
222 Master_info* mi= 0;
223 for (uint i= 0; i < MAX_CHANNELS; i++)
224 {
225 mi= rpl_pfs_mi[i];
226 if (mi)
227 {
228 if ( !strcmp(mi->get_channel(), channel_name))
229 return i;
230 }
231 }
232 return -1;
233 }
234
235
get_mi_at_pos(uint pos)236 Master_info* Multisource_info::get_mi_at_pos(uint pos)
237 {
238 DBUG_ENTER("Multisource_info::get_mi_at_pos");
239
240 m_channel_map_lock->assert_some_lock();
241
242 if ( pos < MAX_CHANNELS)
243 DBUG_RETURN(rpl_pfs_mi[pos]);
244
245 DBUG_RETURN(0);
246 }
247 #endif /*WITH_PERFSCHEMA_STORAGE_ENGINE */
248
249 /* There is only one channel_map for the whole server */
250 Multisource_info channel_map;
251