1 /*
2 Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 /**
25 @file storage/perfschema/table_replication_connection_configuration.cc
26 Table replication_connection_configuration (implementation).
27 */
28
29 #define HAVE_REPLICATION
30
31 #include "my_global.h"
32 #include "table_replication_connection_configuration.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h" /* Multisource replciation */
41
42 THR_LOCK table_replication_connection_configuration::m_table_lock;
43
44 /* Numbers in varchar count utf8 characters. */
45 static const TABLE_FIELD_TYPE field_types[]=
46 {
47 {
48 {C_STRING_WITH_LEN("CHANNEL_NAME")},
49 {C_STRING_WITH_LEN("char(64)")},
50 {NULL, 0}
51 },
52 {
53 {C_STRING_WITH_LEN("HOST")},
54 {C_STRING_WITH_LEN("char(60)")},
55 {NULL, 0}
56 },
57 {
58 {C_STRING_WITH_LEN("PORT")},
59 {C_STRING_WITH_LEN("int(11)")},
60 {NULL, 0}
61 },
62 {
63 {C_STRING_WITH_LEN("USER")},
64 {C_STRING_WITH_LEN("char(" USERNAME_CHAR_LENGTH_STR ")")},
65 {NULL, 0}
66 },
67 {
68 {C_STRING_WITH_LEN("NETWORK_INTERFACE")},
69 {C_STRING_WITH_LEN("char(60)")},
70 {NULL, 0}
71 },
72 {
73 {C_STRING_WITH_LEN("AUTO_POSITION")},
74 {C_STRING_WITH_LEN("enum('1','0')")},
75 {NULL, 0}
76 },
77 {
78 {C_STRING_WITH_LEN("SSL_ALLOWED")},
79 {C_STRING_WITH_LEN("enum('YES','NO','IGNORED')")},
80 {NULL, 0}
81 },
82 {
83 {C_STRING_WITH_LEN("SSL_CA_FILE")},
84 {C_STRING_WITH_LEN("varchar(512)")},
85 {NULL, 0}
86 },
87 {
88 {C_STRING_WITH_LEN("SSL_CA_PATH")},
89 {C_STRING_WITH_LEN("varchar(512)")},
90 {NULL, 0}
91 },
92 {
93 {C_STRING_WITH_LEN("SSL_CERTIFICATE")},
94 {C_STRING_WITH_LEN("varchar(512)")},
95 {NULL, 0}
96 },
97 {
98 {C_STRING_WITH_LEN("SSL_CIPHER")},
99 {C_STRING_WITH_LEN("varchar(512)")},
100 {NULL, 0}
101 },
102 {
103 {C_STRING_WITH_LEN("SSL_KEY")},
104 {C_STRING_WITH_LEN("varchar(512)")},
105 {NULL, 0}
106 },
107 {
108 {C_STRING_WITH_LEN("SSL_VERIFY_SERVER_CERTIFICATE")},
109 {C_STRING_WITH_LEN("enum('YES','NO')")},
110 {NULL, 0}
111 },
112 {
113 {C_STRING_WITH_LEN("SSL_CRL_FILE")},
114 {C_STRING_WITH_LEN("varchar(255)")},
115 {NULL, 0}
116 },
117 {
118 {C_STRING_WITH_LEN("SSL_CRL_PATH")},
119 {C_STRING_WITH_LEN("varchar(255)")},
120 {NULL, 0}
121 },
122 {
123 {C_STRING_WITH_LEN("CONNECTION_RETRY_INTERVAL")},
124 {C_STRING_WITH_LEN("int(11)")},
125 {NULL, 0}
126 },
127 {
128 {C_STRING_WITH_LEN("CONNECTION_RETRY_COUNT")},
129 {C_STRING_WITH_LEN("bigint")},
130 {NULL, 0}
131 },
132 {
133 {C_STRING_WITH_LEN("HEARTBEAT_INTERVAL")},
134 {C_STRING_WITH_LEN("double(10,3)")},
135 {NULL, 0}
136 },
137 {
138 {C_STRING_WITH_LEN("TLS_VERSION")},
139 {C_STRING_WITH_LEN("varchar(255)")},
140 {NULL, 0}
141 }
142 };
143
144 TABLE_FIELD_DEF
145 table_replication_connection_configuration::m_field_def=
146 { 19, field_types };
147
148 PFS_engine_table_share
149 table_replication_connection_configuration::m_share=
150 {
151 { C_STRING_WITH_LEN("replication_connection_configuration") },
152 &pfs_readonly_acl,
153 table_replication_connection_configuration::create,
154 NULL, /* write_row */
155 NULL, /* delete_all_rows */
156 table_replication_connection_configuration::get_row_count, /* records */
157 sizeof(pos_t), /* ref length */
158 &m_table_lock,
159 &m_field_def,
160 false, /* checked */
161 false /* perpetual */
162 };
163
164
create(void)165 PFS_engine_table* table_replication_connection_configuration::create(void)
166 {
167 return new table_replication_connection_configuration();
168 }
169
170 table_replication_connection_configuration
table_replication_connection_configuration()171 ::table_replication_connection_configuration()
172 : PFS_engine_table(&m_share, &m_pos),
173 m_row_exists(false), m_pos(0), m_next_pos(0)
174 {}
175
176 table_replication_connection_configuration
~table_replication_connection_configuration()177 ::~table_replication_connection_configuration()
178 {}
179
reset_position(void)180 void table_replication_connection_configuration::reset_position(void)
181 {
182 m_pos.m_index= 0;
183 m_next_pos.m_index= 0;
184 }
185
get_row_count()186 ha_rows table_replication_connection_configuration::get_row_count()
187 {
188 /*
189 We actually give the MAX_CHANNELS rather than the current
190 number of channels
191 */
192
193 return channel_map.get_max_channels();
194 }
195
rnd_next(void)196 int table_replication_connection_configuration::rnd_next(void)
197 {
198 Master_info *mi;
199 channel_map.rdlock();
200
201 for (m_pos.set_at(&m_next_pos);
202 m_pos.m_index < channel_map.get_max_channels();
203 m_pos.next())
204 {
205 mi= channel_map.get_mi_at_pos(m_pos.m_index);
206
207 if (mi && mi->host[0])
208 {
209 make_row(mi);
210 m_next_pos.set_after(&m_pos);
211 channel_map.unlock();
212 return 0;
213 }
214 }
215
216 channel_map.unlock();
217 return HA_ERR_END_OF_FILE;
218 }
219
rnd_pos(const void * pos)220 int table_replication_connection_configuration::rnd_pos(const void *pos)
221 {
222 Master_info *mi;
223 int res= HA_ERR_RECORD_DELETED;
224
225 channel_map.rdlock();
226
227 set_position(pos);
228
229 if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
230 {
231 make_row(mi);
232 res= 0;
233 }
234
235 channel_map.unlock();
236 return res;
237 }
238
make_row(Master_info * mi)239 void table_replication_connection_configuration::make_row(Master_info *mi)
240 {
241 char * temp_store;
242
243 m_row_exists= false;
244
245
246 assert(mi != NULL);
247
248 mysql_mutex_lock(&mi->data_lock);
249 mysql_mutex_lock(&mi->rli->data_lock);
250
251 m_row.channel_name_length= strlen(mi->get_channel());
252 memcpy(m_row.channel_name, (char*)mi->get_channel(), m_row.channel_name_length);
253
254 m_row.host_length= strlen(mi->host);
255 memcpy(m_row.host, mi->host, m_row.host_length);
256
257 m_row.port= (unsigned int) mi->port;
258
259 /* can't the user be NULL? */
260 temp_store= (char*)mi->get_user();
261 m_row.user_length= strlen(temp_store);
262 memcpy(m_row.user, temp_store, m_row.user_length);
263
264 temp_store= (char*)mi->bind_addr;
265 m_row.network_interface_length= strlen(temp_store);
266 memcpy(m_row.network_interface, temp_store, m_row.network_interface_length);
267
268 if (mi->is_auto_position())
269 m_row.auto_position= PS_RPL_YES;
270 else
271 m_row.auto_position= PS_RPL_NO;
272
273 #ifdef HAVE_OPENSSL
274 m_row.ssl_allowed= mi->ssl? PS_SSL_ALLOWED_YES:PS_SSL_ALLOWED_NO;
275 #else
276 m_row.ssl_allowed= mi->ssl? PS_SSL_ALLOWED_IGNORED:PS_SSL_ALLOWED_NO;
277 #endif
278
279 temp_store= (char*)mi->ssl_ca;
280 m_row.ssl_ca_file_length= strlen(temp_store);
281 memcpy(m_row.ssl_ca_file, temp_store, m_row.ssl_ca_file_length);
282
283 temp_store= (char*)mi->ssl_capath;
284 m_row.ssl_ca_path_length= strlen(temp_store);
285 memcpy(m_row.ssl_ca_path, temp_store, m_row.ssl_ca_path_length);
286
287 temp_store= (char*)mi->ssl_cert;
288 m_row.ssl_certificate_length= strlen(temp_store);
289 memcpy(m_row.ssl_certificate, temp_store, m_row.ssl_certificate_length);
290
291 temp_store= (char*)mi->ssl_cipher;
292 m_row.ssl_cipher_length= strlen(temp_store);
293 memcpy(m_row.ssl_cipher, temp_store, m_row.ssl_cipher_length);
294
295 temp_store= (char*)mi->ssl_key;
296 m_row.ssl_key_length= strlen(temp_store);
297 memcpy(m_row.ssl_key, temp_store, m_row.ssl_key_length);
298
299 if (mi->ssl_verify_server_cert)
300 m_row.ssl_verify_server_certificate= PS_RPL_YES;
301 else
302 m_row.ssl_verify_server_certificate= PS_RPL_NO;
303
304 temp_store= (char*)mi->ssl_crl;
305 m_row.ssl_crl_file_length= strlen(temp_store);
306 memcpy(m_row.ssl_crl_file, temp_store, m_row.ssl_crl_file_length);
307
308 temp_store= (char*)mi->ssl_crlpath;
309 m_row.ssl_crl_path_length= strlen(temp_store);
310 memcpy(m_row.ssl_crl_path, temp_store, m_row.ssl_crl_path_length);
311
312 m_row.connection_retry_interval= (unsigned int) mi->connect_retry;
313
314 m_row.connection_retry_count= (ulong) mi->retry_count;
315
316 m_row.heartbeat_interval= (double)mi->heartbeat_period;
317
318 temp_store= (char*)mi->tls_version;
319 m_row.tls_version_length= strlen(temp_store);
320 memcpy(m_row.tls_version, temp_store, m_row.tls_version_length);
321
322 mysql_mutex_unlock(&mi->rli->data_lock);
323 mysql_mutex_unlock(&mi->data_lock);
324
325 m_row_exists= true;
326 }
327
read_row_values(TABLE * table,unsigned char *,Field ** fields,bool read_all)328 int table_replication_connection_configuration::read_row_values(TABLE *table,
329 unsigned char *,
330 Field **fields,
331 bool read_all)
332 {
333 Field *f;
334
335 if (unlikely(! m_row_exists))
336 return HA_ERR_RECORD_DELETED;
337
338 assert(table->s->null_bytes == 0);
339
340 for (; (f= *fields) ; fields++)
341 {
342 if (read_all || bitmap_is_set(table->read_set, f->field_index))
343 {
344 switch(f->field_index)
345 {
346 case 0: /** channel_name */
347 set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
348 break;
349 case 1: /** host */
350 set_field_char_utf8(f, m_row.host, m_row.host_length);
351 break;
352 case 2: /** port */
353 set_field_ulong(f, m_row.port);
354 break;
355 case 3: /** user */
356 set_field_char_utf8(f, m_row.user, m_row.user_length);
357 break;
358 case 4: /** network_interface */
359 set_field_char_utf8(f, m_row.network_interface,
360 m_row.network_interface_length);
361 break;
362 case 5: /** auto_position */
363 set_field_enum(f, m_row.auto_position);
364 break;
365 case 6: /** ssl_allowed */
366 set_field_enum(f, m_row. ssl_allowed);
367 break;
368 case 7: /**ssl_ca_file */
369 set_field_varchar_utf8(f, m_row.ssl_ca_file,
370 m_row.ssl_ca_file_length);
371 break;
372 case 8: /** ssl_ca_path */
373 set_field_varchar_utf8(f, m_row.ssl_ca_path,
374 m_row.ssl_ca_path_length);
375 break;
376 case 9: /** ssl_certificate */
377 set_field_varchar_utf8(f, m_row.ssl_certificate,
378 m_row.ssl_certificate_length);
379 break;
380 case 10: /** ssl_cipher */
381 set_field_varchar_utf8(f, m_row.ssl_cipher, m_row.ssl_cipher_length);
382 break;
383 case 11: /** ssl_key */
384 set_field_varchar_utf8(f, m_row.ssl_key, m_row.ssl_key_length);
385 break;
386 case 12: /** ssl_verify_server_certificate */
387 set_field_enum(f, m_row.ssl_verify_server_certificate);
388 break;
389 case 13: /** ssl_crl_file */
390 set_field_varchar_utf8(f, m_row.ssl_crl_file,
391 m_row.ssl_crl_file_length);
392 break;
393 case 14: /** ssl_crl_path */
394 set_field_varchar_utf8(f, m_row.ssl_crl_path,
395 m_row.ssl_crl_path_length);
396 break;
397 case 15: /** connection_retry_interval */
398 set_field_ulong(f, m_row.connection_retry_interval);
399 break;
400 case 16: /** connect_retry_count */
401 set_field_ulonglong(f, m_row.connection_retry_count);
402 break;
403 case 17:/** number of seconds after which heartbeat will be sent */
404 set_field_double(f, m_row.heartbeat_interval);
405 break;
406 case 18: /** tls_version */
407 set_field_varchar_utf8(f, m_row.tls_version,
408 m_row.tls_version_length);
409 break;
410 default:
411 assert(false);
412 }
413 }
414 }
415 return 0;
416 }
417