1 /*
2 Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 /**
25 @file storage/perfschema/table_replication_connection_status.cc
26 Table replication_connection_status (implementation).
27 */
28
29 #define HAVE_REPLICATION
30
31 #include "my_global.h"
32 #include "table_replication_connection_status.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h" /* Multi source replication */
41 #include "log.h"
42 #include "rpl_group_replication.h"
43
44 /*
45 Callbacks implementation for GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS.
46 */
set_channel_name(void * const context,const char & value,size_t length)47 static void set_channel_name(void* const context, const char& value,
48 size_t length)
49 {
50 }
51
set_group_name(void * const context,const char & value,size_t length)52 static void set_group_name(void* const context, const char& value,
53 size_t length)
54 {
55 struct st_row_connect_status* row=
56 static_cast<struct st_row_connect_status*>(context);
57 const size_t max= UUID_LENGTH;
58 length= std::min(length, max);
59
60 row->group_name_is_null= false;
61 memcpy(row->group_name, &value, length);
62 }
63
set_source_uuid(void * const context,const char & value,size_t length)64 static void set_source_uuid(void* const context, const char& value,
65 size_t length)
66 {
67 struct st_row_connect_status* row=
68 static_cast<struct st_row_connect_status*>(context);
69 const size_t max= UUID_LENGTH;
70 length= std::min(length, max);
71
72 row->source_uuid_is_null= false;
73 memcpy(row->source_uuid, &value, length);
74 }
75
set_service_state(void * const context,bool value)76 static void set_service_state(void* const context, bool value)
77 {
78 struct st_row_connect_status* row=
79 static_cast<struct st_row_connect_status*>(context);
80
81 row->service_state= value ? PS_RPL_CONNECT_SERVICE_STATE_YES
82 : PS_RPL_CONNECT_SERVICE_STATE_NO;
83 }
84
85
86 THR_LOCK table_replication_connection_status::m_table_lock;
87
88
89 /* Numbers in varchar count utf8 characters. */
90 static const TABLE_FIELD_TYPE field_types[]=
91 {
92 {
93 {C_STRING_WITH_LEN("CHANNEL_NAME")},
94 {C_STRING_WITH_LEN("char(64)")},
95 {NULL, 0}
96 },
97 {
98 {C_STRING_WITH_LEN("GROUP_NAME")},
99 {C_STRING_WITH_LEN("char(36)")},
100 {NULL, 0}
101 },
102 {
103 {C_STRING_WITH_LEN("SOURCE_UUID")},
104 {C_STRING_WITH_LEN("char(36)")},
105 {NULL, 0}
106 },
107 {
108 {C_STRING_WITH_LEN("THREAD_ID")},
109 {C_STRING_WITH_LEN("bigint(20)")},
110 {NULL, 0}
111 },
112 {
113 {C_STRING_WITH_LEN("SERVICE_STATE")},
114 {C_STRING_WITH_LEN("enum('ON','OFF','CONNECTING')")},
115 {NULL, 0}
116 },
117 {
118 {C_STRING_WITH_LEN("COUNT_RECEIVED_HEARTBEATS")},
119 {C_STRING_WITH_LEN("bigint(20)")},
120 {NULL, 0}
121 },
122 {
123 {C_STRING_WITH_LEN("LAST_HEARTBEAT_TIMESTAMP")},
124 {C_STRING_WITH_LEN("timestamp")},
125 {NULL, 0}
126 },
127 {
128 {C_STRING_WITH_LEN("RECEIVED_TRANSACTION_SET")},
129 {C_STRING_WITH_LEN("longtext")},
130 {NULL, 0}
131 },
132 {
133 {C_STRING_WITH_LEN("LAST_ERROR_NUMBER")},
134 {C_STRING_WITH_LEN("int(11)")},
135 {NULL, 0}
136 },
137 {
138 {C_STRING_WITH_LEN("LAST_ERROR_MESSAGE")},
139 {C_STRING_WITH_LEN("varchar(1024)")},
140 {NULL, 0}
141 },
142 {
143 {C_STRING_WITH_LEN("LAST_ERROR_TIMESTAMP")},
144 {C_STRING_WITH_LEN("timestamp")},
145 {NULL, 0}
146 }
147 };
148
149 TABLE_FIELD_DEF
150 table_replication_connection_status::m_field_def= { 11, field_types };
151
152 PFS_engine_table_share
153 table_replication_connection_status::m_share=
154 {
155 { C_STRING_WITH_LEN("replication_connection_status") },
156 &pfs_readonly_acl,
157 table_replication_connection_status::create,
158 NULL, /* write_row */
159 NULL, /* delete_all_rows */
160 table_replication_connection_status::get_row_count, /* records */
161 sizeof(pos_t), /* ref length */
162 &m_table_lock,
163 &m_field_def,
164 false, /* checked */
165 false /* perpetual */
166 };
167
create(void)168 PFS_engine_table* table_replication_connection_status::create(void)
169 {
170 return new table_replication_connection_status();
171 }
172
table_replication_connection_status()173 table_replication_connection_status::table_replication_connection_status()
174 : PFS_engine_table(&m_share, &m_pos),
175 m_row_exists(false), m_pos(0), m_next_pos(0)
176 {
177 }
178
~table_replication_connection_status()179 table_replication_connection_status::~table_replication_connection_status()
180 {
181 }
182
reset_position(void)183 void table_replication_connection_status::reset_position(void)
184 {
185 m_pos.m_index= 0;
186 m_next_pos.m_index= 0;
187 }
188
get_row_count()189 ha_rows table_replication_connection_status::get_row_count()
190 {
191 /*A lock is not needed for an estimate */
192 return channel_map.get_max_channels();
193 }
194
195
196
rnd_next(void)197 int table_replication_connection_status::rnd_next(void)
198 {
199 Master_info *mi= NULL;
200 channel_map.rdlock();
201
202 for (m_pos.set_at(&m_next_pos);
203 m_pos.m_index < channel_map.get_max_channels();
204 m_pos.next())
205 {
206 mi= channel_map.get_mi_at_pos(m_pos.m_index);
207
208 if (mi && mi->host[0])
209 {
210 make_row(mi);
211 m_next_pos.set_after(&m_pos);
212 channel_map.unlock();
213 return 0;
214 }
215 }
216
217 channel_map.unlock();
218 return HA_ERR_END_OF_FILE;
219 }
220
rnd_pos(const void * pos)221 int table_replication_connection_status::rnd_pos(const void *pos)
222 {
223 Master_info *mi;
224 int res= HA_ERR_RECORD_DELETED;
225
226 set_position(pos);
227
228 channel_map.rdlock();
229
230 if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
231 {
232 make_row(mi);
233 res= 0;
234 }
235
236 channel_map.unlock();
237 return res;
238 }
239
make_row(Master_info * mi)240 void table_replication_connection_status::make_row(Master_info *mi)
241 {
242 DBUG_ENTER("table_replication_connection_status::make_row");
243 m_row_exists= false;
244 bool error= false;
245
246 /* Default values */
247 m_row.group_name_is_null= true;
248 m_row.source_uuid_is_null= true;
249 m_row.thread_id_is_null= true;
250 m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_NO;
251
252 assert(mi != NULL);
253 assert(mi->rli != NULL);
254
255 mysql_mutex_lock(&mi->data_lock);
256 mysql_mutex_lock(&mi->rli->data_lock);
257
258 m_row.channel_name_length= mi->get_channel() ? strlen(mi->get_channel()):0;
259 memcpy(m_row.channel_name, mi->get_channel(), m_row.channel_name_length);
260
261 if (is_group_replication_plugin_loaded() &&
262 channel_map.is_group_replication_channel_name(mi->get_channel(), true))
263 {
264 /*
265 Group Replication applier channel.
266 Set callbacks on GROUP_REPLICATION_GROUP_MEMBER_STATS_CALLBACKS.
267 */
268 const GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS callbacks=
269 {
270 &m_row,
271 &set_channel_name,
272 &set_group_name,
273 &set_source_uuid,
274 &set_service_state,
275 };
276
277 // Query plugin and let callbacks do their job.
278 if (get_group_replication_connection_status_info(callbacks))
279 {
280 DBUG_PRINT("info", ("Group Replication stats not available!"));
281 }
282 }
283 else
284 {
285 /* Slave channel. */
286 if (mi->master_uuid[0] != 0)
287 {
288 memcpy(m_row.source_uuid, mi->master_uuid, UUID_LENGTH);
289 m_row.source_uuid_is_null= false;
290 }
291
292 if (mi->slave_running == MYSQL_SLAVE_RUN_CONNECT)
293 m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_YES;
294 else
295 {
296 if (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT)
297 m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_CONNECTING;
298 else
299 m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_NO;
300 }
301 }
302
303 if (mi->slave_running == MYSQL_SLAVE_RUN_CONNECT)
304 {
305 PSI_thread *psi= thd_get_psi(mi->info_thd);
306 PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
307 if(pfs)
308 {
309 m_row.thread_id= pfs->m_thread_internal_id;
310 m_row.thread_id_is_null= false;
311 }
312 }
313
314 m_row.count_received_heartbeats= mi->received_heartbeats;
315 /*
316 Time in Milliseconds since epoch. active_mi->last_heartbeat contains
317 number of seconds so we multiply by 1000000.
318 */
319 m_row.last_heartbeat_timestamp= (ulonglong)mi->last_heartbeat*1000000;
320
321 {
322 global_sid_lock->wrlock();
323 const Gtid_set* io_gtid_set= mi->rli->get_gtid_set();
324
325 if ((m_row.received_transaction_set_length=
326 io_gtid_set->to_string(&m_row.received_transaction_set)) < 0)
327 {
328 my_free(m_row.received_transaction_set);
329 m_row.received_transaction_set_length= 0;
330 global_sid_lock->unlock();
331 error= true;
332 goto end;
333 }
334 global_sid_lock->unlock();
335 }
336
337 /* Errors */
338 mysql_mutex_lock(&mi->err_lock);
339 mysql_mutex_lock(&mi->rli->err_lock);
340 m_row.last_error_number= (unsigned int) mi->last_error().number;
341 m_row.last_error_message_length= 0;
342 m_row.last_error_timestamp= 0;
343
344 /** If error, set error message and timestamp */
345 if (m_row.last_error_number)
346 {
347 char* temp_store= (char*)mi->last_error().message;
348 m_row.last_error_message_length= strlen(temp_store);
349 memcpy(m_row.last_error_message, temp_store,
350 m_row.last_error_message_length);
351
352 /*
353 Time in millisecond since epoch. active_mi->last_error().skr contains
354 number of seconds so we multiply by 1000000. */
355 m_row.last_error_timestamp= (ulonglong)mi->last_error().skr*1000000;
356 }
357 mysql_mutex_unlock(&mi->rli->err_lock);
358 mysql_mutex_unlock(&mi->err_lock);
359
360 end:
361 mysql_mutex_unlock(&mi->rli->data_lock);
362 mysql_mutex_unlock(&mi->data_lock);
363
364 if (!error)
365 m_row_exists= true;
366 DBUG_VOID_RETURN;
367 }
368
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)369 int table_replication_connection_status::read_row_values(TABLE *table,
370 unsigned char *buf,
371 Field **fields,
372 bool read_all)
373 {
374 Field *f;
375
376 if (unlikely(! m_row_exists))
377 return HA_ERR_RECORD_DELETED;
378
379 assert(table->s->null_bytes == 1);
380 buf[0]= 0;
381
382 for (; (f= *fields) ; fields++)
383 {
384 if (read_all || bitmap_is_set(table->read_set, f->field_index))
385 {
386 switch(f->field_index)
387 {
388 case 0: /** channel_name*/
389 set_field_char_utf8(f, m_row.channel_name,m_row.channel_name_length);
390 break;
391 case 1: /** group_name */
392 if (m_row.group_name_is_null)
393 f->set_null();
394 else
395 set_field_char_utf8(f, m_row.group_name, UUID_LENGTH);
396 break;
397 case 2: /** source_uuid */
398 if (m_row.source_uuid_is_null)
399 f->set_null();
400 else
401 set_field_char_utf8(f, m_row.source_uuid, UUID_LENGTH);
402 break;
403 case 3: /** thread_id */
404 if(m_row.thread_id_is_null)
405 f->set_null();
406 else
407 set_field_ulonglong(f, m_row.thread_id);
408 break;
409 case 4: /** service_state */
410 set_field_enum(f, m_row.service_state);
411 break;
412 case 5: /** number of heartbeat events received **/
413 set_field_ulonglong(f, m_row.count_received_heartbeats);
414 break;
415 case 6: /** time of receipt of last heartbeat event **/
416 set_field_timestamp(f, m_row.last_heartbeat_timestamp);
417 break;
418 case 7: /** received_transaction_set */
419 set_field_longtext_utf8(f, m_row.received_transaction_set,
420 m_row.received_transaction_set_length);
421 break;
422 case 8: /*last_error_number*/
423 set_field_ulong(f, m_row.last_error_number);
424 break;
425 case 9: /*last_error_message*/
426 set_field_varchar_utf8(f, m_row.last_error_message,
427 m_row.last_error_message_length);
428 break;
429 case 10: /*last_error_timestamp*/
430 set_field_timestamp(f, m_row.last_error_timestamp);
431 break;
432 default:
433 assert(false);
434 }
435 }
436 }
437 m_row.cleanup();
438
439 return 0;
440 }
441