1 /*
2       Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3 
4       This program is free software; you can redistribute it and/or modify
5       it under the terms of the GNU General Public License, version 2.0,
6       as published by the Free Software Foundation.
7 
8       This program is also distributed with certain software (including
9       but not limited to OpenSSL) that is licensed under separate terms,
10       as designated in a particular file or component or in included license
11       documentation.  The authors of MySQL hereby grant you an additional
12       permission to link the program and your derivative works with the
13       separately licensed software that they have included with MySQL.
14 
15       This program is distributed in the hope that it will be useful,
16       but WITHOUT ANY WARRANTY; without even the implied warranty of
17       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18       GNU General Public License, version 2.0, for more details.
19 
20       You should have received a copy of the GNU General Public License
21       along with this program; if not, write to the Free Software
22       Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 /**
25   @file storage/perfschema/table_replication_connection_configuration.cc
26   Table replication_connection_configuration (implementation).
27 */
28 
29 #define HAVE_REPLICATION
30 
31 #include "my_global.h"
32 #include "table_replication_connection_configuration.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h"             /* Multisource replciation */
41 
42 THR_LOCK table_replication_connection_configuration::m_table_lock;
43 
44 /* Numbers in varchar count utf8 characters. */
45 static const TABLE_FIELD_TYPE field_types[]=
46 {
47   {
48     {C_STRING_WITH_LEN("CHANNEL_NAME")},
49     {C_STRING_WITH_LEN("char(64)")},
50     {NULL, 0}
51   },
52   {
53     {C_STRING_WITH_LEN("HOST")},
54     {C_STRING_WITH_LEN("char(60)")},
55     {NULL, 0}
56   },
57   {
58     {C_STRING_WITH_LEN("PORT")},
59     {C_STRING_WITH_LEN("int(11)")},
60     {NULL, 0}
61   },
62   {
63     {C_STRING_WITH_LEN("USER")},
64     {C_STRING_WITH_LEN("char(" USERNAME_CHAR_LENGTH_STR ")")},
65     {NULL, 0}
66   },
67   {
68     {C_STRING_WITH_LEN("NETWORK_INTERFACE")},
69     {C_STRING_WITH_LEN("char(60)")},
70     {NULL, 0}
71   },
72   {
73     {C_STRING_WITH_LEN("AUTO_POSITION")},
74     {C_STRING_WITH_LEN("enum('1','0')")},
75     {NULL, 0}
76   },
77   {
78     {C_STRING_WITH_LEN("SSL_ALLOWED")},
79     {C_STRING_WITH_LEN("enum('YES','NO','IGNORED')")},
80     {NULL, 0}
81   },
82   {
83     {C_STRING_WITH_LEN("SSL_CA_FILE")},
84     {C_STRING_WITH_LEN("varchar(512)")},
85     {NULL, 0}
86   },
87   {
88     {C_STRING_WITH_LEN("SSL_CA_PATH")},
89     {C_STRING_WITH_LEN("varchar(512)")},
90     {NULL, 0}
91   },
92   {
93     {C_STRING_WITH_LEN("SSL_CERTIFICATE")},
94     {C_STRING_WITH_LEN("varchar(512)")},
95     {NULL, 0}
96   },
97   {
98     {C_STRING_WITH_LEN("SSL_CIPHER")},
99     {C_STRING_WITH_LEN("varchar(512)")},
100     {NULL, 0}
101   },
102   {
103     {C_STRING_WITH_LEN("SSL_KEY")},
104     {C_STRING_WITH_LEN("varchar(512)")},
105     {NULL, 0}
106   },
107   {
108     {C_STRING_WITH_LEN("SSL_VERIFY_SERVER_CERTIFICATE")},
109     {C_STRING_WITH_LEN("enum('YES','NO')")},
110     {NULL, 0}
111   },
112   {
113     {C_STRING_WITH_LEN("SSL_CRL_FILE")},
114     {C_STRING_WITH_LEN("varchar(255)")},
115     {NULL, 0}
116   },
117   {
118     {C_STRING_WITH_LEN("SSL_CRL_PATH")},
119     {C_STRING_WITH_LEN("varchar(255)")},
120     {NULL, 0}
121   },
122   {
123     {C_STRING_WITH_LEN("CONNECTION_RETRY_INTERVAL")},
124     {C_STRING_WITH_LEN("int(11)")},
125     {NULL, 0}
126   },
127   {
128     {C_STRING_WITH_LEN("CONNECTION_RETRY_COUNT")},
129     {C_STRING_WITH_LEN("bigint")},
130     {NULL, 0}
131   },
132   {
133     {C_STRING_WITH_LEN("HEARTBEAT_INTERVAL")},
134     {C_STRING_WITH_LEN("double(10,3)")},
135     {NULL, 0}
136    },
137   {
138     {C_STRING_WITH_LEN("TLS_VERSION")},
139     {C_STRING_WITH_LEN("varchar(255)")},
140     {NULL, 0}
141   }
142 };
143 
144 TABLE_FIELD_DEF
145 table_replication_connection_configuration::m_field_def=
146 { 19, field_types };
147 
148 PFS_engine_table_share
149 table_replication_connection_configuration::m_share=
150 {
151   { C_STRING_WITH_LEN("replication_connection_configuration") },
152   &pfs_readonly_acl,
153   table_replication_connection_configuration::create,
154   NULL, /* write_row */
155   NULL, /* delete_all_rows */
156   table_replication_connection_configuration::get_row_count, /* records */
157   sizeof(pos_t), /* ref length */
158   &m_table_lock,
159   &m_field_def,
160   false, /* checked */
161   false  /* perpetual */
162 };
163 
164 
create(void)165 PFS_engine_table* table_replication_connection_configuration::create(void)
166 {
167   return new table_replication_connection_configuration();
168 }
169 
170 table_replication_connection_configuration
table_replication_connection_configuration()171   ::table_replication_connection_configuration()
172   : PFS_engine_table(&m_share, &m_pos),
173     m_row_exists(false), m_pos(0), m_next_pos(0)
174 {}
175 
176 table_replication_connection_configuration
~table_replication_connection_configuration()177   ::~table_replication_connection_configuration()
178 {}
179 
reset_position(void)180 void table_replication_connection_configuration::reset_position(void)
181 {
182   m_pos.m_index= 0;
183   m_next_pos.m_index= 0;
184 }
185 
get_row_count()186 ha_rows table_replication_connection_configuration::get_row_count()
187 {
188   /*
189      We actually give the MAX_CHANNELS rather than the current
190      number of channels
191   */
192 
193  return channel_map.get_max_channels();
194 }
195 
rnd_next(void)196 int table_replication_connection_configuration::rnd_next(void)
197 {
198   Master_info *mi;
199   channel_map.rdlock();
200 
201   for (m_pos.set_at(&m_next_pos);
202        m_pos.m_index < channel_map.get_max_channels();
203        m_pos.next())
204   {
205     mi= channel_map.get_mi_at_pos(m_pos.m_index);
206 
207     if (mi && mi->host[0])
208     {
209       make_row(mi);
210       m_next_pos.set_after(&m_pos);
211       channel_map.unlock();
212       return 0;
213     }
214   }
215 
216   channel_map.unlock();
217   return HA_ERR_END_OF_FILE;
218 }
219 
rnd_pos(const void * pos)220 int table_replication_connection_configuration::rnd_pos(const void *pos)
221 {
222   Master_info *mi;
223   int res= HA_ERR_RECORD_DELETED;
224 
225   channel_map.rdlock();
226 
227   set_position(pos);
228 
229   if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
230   {
231     make_row(mi);
232     res= 0;
233   }
234 
235   channel_map.unlock();
236   return res;
237 }
238 
make_row(Master_info * mi)239 void table_replication_connection_configuration::make_row(Master_info *mi)
240 {
241   char * temp_store;
242 
243   m_row_exists= false;
244 
245 
246   assert(mi != NULL);
247 
248   mysql_mutex_lock(&mi->data_lock);
249   mysql_mutex_lock(&mi->rli->data_lock);
250 
251   m_row.channel_name_length= strlen(mi->get_channel());
252   memcpy(m_row.channel_name, (char*)mi->get_channel(), m_row.channel_name_length);
253 
254   m_row.host_length= strlen(mi->host);
255   memcpy(m_row.host, mi->host, m_row.host_length);
256 
257   m_row.port= (unsigned int) mi->port;
258 
259   /* can't the user be NULL? */
260   temp_store= (char*)mi->get_user();
261   m_row.user_length= strlen(temp_store);
262   memcpy(m_row.user, temp_store, m_row.user_length);
263 
264   temp_store= (char*)mi->bind_addr;
265   m_row.network_interface_length= strlen(temp_store);
266   memcpy(m_row.network_interface, temp_store, m_row.network_interface_length);
267 
268   if (mi->is_auto_position())
269     m_row.auto_position= PS_RPL_YES;
270   else
271     m_row.auto_position= PS_RPL_NO;
272 
273 #ifdef HAVE_OPENSSL
274   m_row.ssl_allowed= mi->ssl? PS_SSL_ALLOWED_YES:PS_SSL_ALLOWED_NO;
275 #else
276   m_row.ssl_allowed= mi->ssl? PS_SSL_ALLOWED_IGNORED:PS_SSL_ALLOWED_NO;
277 #endif
278 
279   temp_store= (char*)mi->ssl_ca;
280   m_row.ssl_ca_file_length= strlen(temp_store);
281   memcpy(m_row.ssl_ca_file, temp_store, m_row.ssl_ca_file_length);
282 
283   temp_store= (char*)mi->ssl_capath;
284   m_row.ssl_ca_path_length= strlen(temp_store);
285   memcpy(m_row.ssl_ca_path, temp_store, m_row.ssl_ca_path_length);
286 
287   temp_store= (char*)mi->ssl_cert;
288   m_row.ssl_certificate_length= strlen(temp_store);
289   memcpy(m_row.ssl_certificate, temp_store, m_row.ssl_certificate_length);
290 
291   temp_store= (char*)mi->ssl_cipher;
292   m_row.ssl_cipher_length= strlen(temp_store);
293   memcpy(m_row.ssl_cipher, temp_store, m_row.ssl_cipher_length);
294 
295   temp_store= (char*)mi->ssl_key;
296   m_row.ssl_key_length= strlen(temp_store);
297   memcpy(m_row.ssl_key, temp_store, m_row.ssl_key_length);
298 
299   if (mi->ssl_verify_server_cert)
300     m_row.ssl_verify_server_certificate= PS_RPL_YES;
301   else
302     m_row.ssl_verify_server_certificate= PS_RPL_NO;
303 
304   temp_store= (char*)mi->ssl_crl;
305   m_row.ssl_crl_file_length= strlen(temp_store);
306   memcpy(m_row.ssl_crl_file, temp_store, m_row.ssl_crl_file_length);
307 
308   temp_store= (char*)mi->ssl_crlpath;
309   m_row.ssl_crl_path_length= strlen(temp_store);
310   memcpy(m_row.ssl_crl_path, temp_store, m_row.ssl_crl_path_length);
311 
312   m_row.connection_retry_interval= (unsigned int) mi->connect_retry;
313 
314   m_row.connection_retry_count= (ulong) mi->retry_count;
315 
316   m_row.heartbeat_interval= (double)mi->heartbeat_period;
317 
318   temp_store= (char*)mi->tls_version;
319   m_row.tls_version_length= strlen(temp_store);
320   memcpy(m_row.tls_version, temp_store, m_row.tls_version_length);
321 
322   mysql_mutex_unlock(&mi->rli->data_lock);
323   mysql_mutex_unlock(&mi->data_lock);
324 
325   m_row_exists= true;
326 }
327 
read_row_values(TABLE * table,unsigned char *,Field ** fields,bool read_all)328 int table_replication_connection_configuration::read_row_values(TABLE *table,
329                                                                 unsigned char *,
330                                                                 Field **fields,
331                                                                 bool read_all)
332 {
333   Field *f;
334 
335   if (unlikely(! m_row_exists))
336     return HA_ERR_RECORD_DELETED;
337 
338   assert(table->s->null_bytes == 0);
339 
340   for (; (f= *fields) ; fields++)
341   {
342     if (read_all || bitmap_is_set(table->read_set, f->field_index))
343     {
344       switch(f->field_index)
345       {
346       case 0: /** channel_name */
347         set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
348         break;
349       case 1: /** host */
350         set_field_char_utf8(f, m_row.host, m_row.host_length);
351         break;
352       case 2: /** port */
353         set_field_ulong(f, m_row.port);
354         break;
355       case 3: /** user */
356         set_field_char_utf8(f, m_row.user, m_row.user_length);
357         break;
358       case 4: /** network_interface */
359         set_field_char_utf8(f, m_row.network_interface,
360                                m_row.network_interface_length);
361         break;
362       case 5: /** auto_position */
363         set_field_enum(f, m_row.auto_position);
364         break;
365       case 6: /** ssl_allowed */
366         set_field_enum(f, m_row. ssl_allowed);
367         break;
368       case 7: /**ssl_ca_file */
369         set_field_varchar_utf8(f, m_row.ssl_ca_file,
370                                m_row.ssl_ca_file_length);
371         break;
372       case 8: /** ssl_ca_path */
373         set_field_varchar_utf8(f, m_row.ssl_ca_path,
374                                m_row.ssl_ca_path_length);
375         break;
376       case 9: /** ssl_certificate */
377         set_field_varchar_utf8(f, m_row.ssl_certificate,
378                                m_row.ssl_certificate_length);
379         break;
380       case 10: /** ssl_cipher */
381         set_field_varchar_utf8(f, m_row.ssl_cipher, m_row.ssl_cipher_length);
382         break;
383       case 11: /** ssl_key */
384         set_field_varchar_utf8(f, m_row.ssl_key, m_row.ssl_key_length);
385         break;
386       case 12: /** ssl_verify_server_certificate */
387         set_field_enum(f, m_row.ssl_verify_server_certificate);
388         break;
389       case 13: /** ssl_crl_file */
390         set_field_varchar_utf8(f, m_row.ssl_crl_file,
391                                m_row.ssl_crl_file_length);
392         break;
393       case 14: /** ssl_crl_path */
394         set_field_varchar_utf8(f, m_row.ssl_crl_path,
395                                m_row.ssl_crl_path_length);
396         break;
397       case 15: /** connection_retry_interval */
398         set_field_ulong(f, m_row.connection_retry_interval);
399         break;
400       case 16: /** connect_retry_count */
401         set_field_ulonglong(f, m_row.connection_retry_count);
402         break;
403       case 17:/** number of seconds after which heartbeat will be sent */
404         set_field_double(f, m_row.heartbeat_interval);
405         break;
406       case 18: /** tls_version */
407         set_field_varchar_utf8(f, m_row.tls_version,
408                                m_row.tls_version_length);
409         break;
410       default:
411         assert(false);
412       }
413     }
414   }
415   return 0;
416 }
417