1 /*
2       Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3 
4       This program is free software; you can redistribute it and/or modify
5       it under the terms of the GNU General Public License, version 2.0,
6       as published by the Free Software Foundation.
7 
8       This program is also distributed with certain software (including
9       but not limited to OpenSSL) that is licensed under separate terms,
10       as designated in a particular file or component or in included license
11       documentation.  The authors of MySQL hereby grant you an additional
12       permission to link the program and your derivative works with the
13       separately licensed software that they have included with MySQL.
14 
15       This program is distributed in the hope that it will be useful,
16       but WITHOUT ANY WARRANTY; without even the implied warranty of
17       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18       GNU General Public License, version 2.0, for more details.
19 
20       You should have received a copy of the GNU General Public License
21       along with this program; if not, write to the Free Software
22       Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 /**
25   @file storage/perfschema/table_replication_connection_status.cc
26   Table replication_connection_status (implementation).
27 */
28 
29 #define HAVE_REPLICATION
30 
31 #include "my_global.h"
32 #include "table_replication_connection_status.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include  "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h"           /* Multi source replication */
41 #include "log.h"
42 #include "rpl_group_replication.h"
43 
44 /*
45   Callbacks implementation for GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS.
46 */
set_channel_name(void * const context,const char & value,size_t length)47 static void set_channel_name(void* const context, const char& value,
48                              size_t length)
49 {
50 }
51 
set_group_name(void * const context,const char & value,size_t length)52 static void set_group_name(void* const context, const char& value,
53                            size_t length)
54 {
55   struct st_row_connect_status* row=
56       static_cast<struct st_row_connect_status*>(context);
57   const size_t max= UUID_LENGTH;
58   length= std::min(length, max);
59 
60   row->group_name_is_null= false;
61   memcpy(row->group_name, &value, length);
62 }
63 
set_source_uuid(void * const context,const char & value,size_t length)64 static void set_source_uuid(void* const context, const char& value,
65                             size_t length)
66 {
67   struct st_row_connect_status* row=
68       static_cast<struct st_row_connect_status*>(context);
69   const size_t max= UUID_LENGTH;
70   length= std::min(length, max);
71 
72   row->source_uuid_is_null= false;
73   memcpy(row->source_uuid, &value, length);
74 }
75 
set_service_state(void * const context,bool value)76 static void set_service_state(void* const context, bool value)
77 {
78   struct st_row_connect_status* row=
79       static_cast<struct st_row_connect_status*>(context);
80 
81   row->service_state= value ? PS_RPL_CONNECT_SERVICE_STATE_YES
82                             : PS_RPL_CONNECT_SERVICE_STATE_NO;
83 }
84 
85 
86 THR_LOCK table_replication_connection_status::m_table_lock;
87 
88 
89 /* Numbers in varchar count utf8 characters. */
90 static const TABLE_FIELD_TYPE field_types[]=
91 {
92   {
93     {C_STRING_WITH_LEN("CHANNEL_NAME")},
94     {C_STRING_WITH_LEN("char(64)")},
95     {NULL, 0}
96   },
97   {
98     {C_STRING_WITH_LEN("GROUP_NAME")},
99     {C_STRING_WITH_LEN("char(36)")},
100     {NULL, 0}
101   },
102   {
103     {C_STRING_WITH_LEN("SOURCE_UUID")},
104     {C_STRING_WITH_LEN("char(36)")},
105     {NULL, 0}
106   },
107   {
108     {C_STRING_WITH_LEN("THREAD_ID")},
109     {C_STRING_WITH_LEN("bigint(20)")},
110     {NULL, 0}
111   },
112   {
113     {C_STRING_WITH_LEN("SERVICE_STATE")},
114     {C_STRING_WITH_LEN("enum('ON','OFF','CONNECTING')")},
115     {NULL, 0}
116   },
117   {
118     {C_STRING_WITH_LEN("COUNT_RECEIVED_HEARTBEATS")},
119     {C_STRING_WITH_LEN("bigint(20)")},
120     {NULL, 0}
121   },
122   {
123     {C_STRING_WITH_LEN("LAST_HEARTBEAT_TIMESTAMP")},
124     {C_STRING_WITH_LEN("timestamp")},
125     {NULL, 0}
126   },
127   {
128     {C_STRING_WITH_LEN("RECEIVED_TRANSACTION_SET")},
129     {C_STRING_WITH_LEN("longtext")},
130     {NULL, 0}
131   },
132   {
133     {C_STRING_WITH_LEN("LAST_ERROR_NUMBER")},
134     {C_STRING_WITH_LEN("int(11)")},
135     {NULL, 0}
136   },
137   {
138     {C_STRING_WITH_LEN("LAST_ERROR_MESSAGE")},
139     {C_STRING_WITH_LEN("varchar(1024)")},
140     {NULL, 0}
141   },
142   {
143     {C_STRING_WITH_LEN("LAST_ERROR_TIMESTAMP")},
144     {C_STRING_WITH_LEN("timestamp")},
145     {NULL, 0}
146   }
147 };
148 
149 TABLE_FIELD_DEF
150 table_replication_connection_status::m_field_def= { 11, field_types };
151 
152 PFS_engine_table_share
153 table_replication_connection_status::m_share=
154 {
155   { C_STRING_WITH_LEN("replication_connection_status") },
156   &pfs_readonly_acl,
157   table_replication_connection_status::create,
158   NULL, /* write_row */
159   NULL, /* delete_all_rows */
160   table_replication_connection_status::get_row_count, /* records */
161   sizeof(pos_t), /* ref length */
162   &m_table_lock,
163   &m_field_def,
164   false, /* checked */
165   false  /* perpetual */
166 };
167 
create(void)168 PFS_engine_table* table_replication_connection_status::create(void)
169 {
170   return new table_replication_connection_status();
171 }
172 
table_replication_connection_status()173 table_replication_connection_status::table_replication_connection_status()
174   : PFS_engine_table(&m_share, &m_pos),
175     m_row_exists(false), m_pos(0), m_next_pos(0)
176 {
177 }
178 
~table_replication_connection_status()179 table_replication_connection_status::~table_replication_connection_status()
180 {
181 }
182 
reset_position(void)183 void table_replication_connection_status::reset_position(void)
184 {
185   m_pos.m_index= 0;
186   m_next_pos.m_index= 0;
187 }
188 
get_row_count()189 ha_rows table_replication_connection_status::get_row_count()
190 {
191   /*A lock is not needed for an estimate */
192   return channel_map.get_max_channels();
193 }
194 
195 
196 
rnd_next(void)197 int table_replication_connection_status::rnd_next(void)
198 {
199   Master_info *mi= NULL;
200   channel_map.rdlock();
201 
202   for (m_pos.set_at(&m_next_pos);
203        m_pos.m_index < channel_map.get_max_channels();
204        m_pos.next())
205   {
206     mi= channel_map.get_mi_at_pos(m_pos.m_index);
207 
208     if (mi && mi->host[0])
209     {
210       make_row(mi);
211       m_next_pos.set_after(&m_pos);
212       channel_map.unlock();
213       return 0;
214     }
215   }
216 
217   channel_map.unlock();
218   return HA_ERR_END_OF_FILE;
219 }
220 
rnd_pos(const void * pos)221 int table_replication_connection_status::rnd_pos(const void *pos)
222 {
223   Master_info *mi;
224   int res= HA_ERR_RECORD_DELETED;
225 
226   set_position(pos);
227 
228   channel_map.rdlock();
229 
230   if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
231   {
232     make_row(mi);
233     res= 0;
234   }
235 
236   channel_map.unlock();
237   return res;
238 }
239 
make_row(Master_info * mi)240 void table_replication_connection_status::make_row(Master_info *mi)
241 {
242   DBUG_ENTER("table_replication_connection_status::make_row");
243   m_row_exists= false;
244   bool error= false;
245 
246   /* Default values */
247   m_row.group_name_is_null= true;
248   m_row.source_uuid_is_null= true;
249   m_row.thread_id_is_null= true;
250   m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_NO;
251 
252   assert(mi != NULL);
253   assert(mi->rli != NULL);
254 
255   mysql_mutex_lock(&mi->data_lock);
256   mysql_mutex_lock(&mi->rli->data_lock);
257 
258   m_row.channel_name_length= mi->get_channel() ? strlen(mi->get_channel()):0;
259   memcpy(m_row.channel_name, mi->get_channel(), m_row.channel_name_length);
260 
261   if (is_group_replication_plugin_loaded() &&
262       channel_map.is_group_replication_channel_name(mi->get_channel(), true))
263   {
264     /*
265       Group Replication applier channel.
266       Set callbacks on GROUP_REPLICATION_GROUP_MEMBER_STATS_CALLBACKS.
267     */
268     const GROUP_REPLICATION_CONNECTION_STATUS_CALLBACKS callbacks=
269     {
270       &m_row,
271       &set_channel_name,
272       &set_group_name,
273       &set_source_uuid,
274       &set_service_state,
275     };
276 
277     // Query plugin and let callbacks do their job.
278     if (get_group_replication_connection_status_info(callbacks))
279     {
280       DBUG_PRINT("info", ("Group Replication stats not available!"));
281     }
282   }
283   else
284   {
285     /* Slave channel. */
286     if (mi->master_uuid[0] != 0)
287     {
288       memcpy(m_row.source_uuid, mi->master_uuid, UUID_LENGTH);
289       m_row.source_uuid_is_null= false;
290     }
291 
292     if (mi->slave_running == MYSQL_SLAVE_RUN_CONNECT)
293       m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_YES;
294     else
295     {
296       if (mi->slave_running == MYSQL_SLAVE_RUN_NOT_CONNECT)
297         m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_CONNECTING;
298       else
299         m_row.service_state= PS_RPL_CONNECT_SERVICE_STATE_NO;
300     }
301   }
302 
303   if (mi->slave_running == MYSQL_SLAVE_RUN_CONNECT)
304   {
305     PSI_thread *psi= thd_get_psi(mi->info_thd);
306     PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
307     if(pfs)
308     {
309       m_row.thread_id= pfs->m_thread_internal_id;
310       m_row.thread_id_is_null= false;
311     }
312   }
313 
314   m_row.count_received_heartbeats= mi->received_heartbeats;
315   /*
316     Time in Milliseconds since epoch. active_mi->last_heartbeat contains
317     number of seconds so we multiply by 1000000.
318   */
319   m_row.last_heartbeat_timestamp= (ulonglong)mi->last_heartbeat*1000000;
320 
321   {
322     global_sid_lock->wrlock();
323     const Gtid_set* io_gtid_set= mi->rli->get_gtid_set();
324 
325     if ((m_row.received_transaction_set_length=
326          io_gtid_set->to_string(&m_row.received_transaction_set)) < 0)
327     {
328       my_free(m_row.received_transaction_set);
329       m_row.received_transaction_set_length= 0;
330       global_sid_lock->unlock();
331       error= true;
332       goto end;
333     }
334     global_sid_lock->unlock();
335   }
336 
337   /* Errors */
338   mysql_mutex_lock(&mi->err_lock);
339   mysql_mutex_lock(&mi->rli->err_lock);
340   m_row.last_error_number= (unsigned int) mi->last_error().number;
341   m_row.last_error_message_length= 0;
342   m_row.last_error_timestamp= 0;
343 
344   /** If error, set error message and timestamp */
345   if (m_row.last_error_number)
346   {
347     char* temp_store= (char*)mi->last_error().message;
348     m_row.last_error_message_length= strlen(temp_store);
349     memcpy(m_row.last_error_message, temp_store,
350            m_row.last_error_message_length);
351 
352     /*
353       Time in millisecond since epoch. active_mi->last_error().skr contains
354       number of seconds so we multiply by 1000000. */
355     m_row.last_error_timestamp= (ulonglong)mi->last_error().skr*1000000;
356   }
357   mysql_mutex_unlock(&mi->rli->err_lock);
358   mysql_mutex_unlock(&mi->err_lock);
359 
360 end:
361   mysql_mutex_unlock(&mi->rli->data_lock);
362   mysql_mutex_unlock(&mi->data_lock);
363 
364   if (!error)
365     m_row_exists= true;
366   DBUG_VOID_RETURN;
367 }
368 
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)369 int table_replication_connection_status::read_row_values(TABLE *table,
370                                                          unsigned char *buf,
371                                                          Field **fields,
372                                                          bool read_all)
373 {
374   Field *f;
375 
376   if (unlikely(! m_row_exists))
377     return HA_ERR_RECORD_DELETED;
378 
379   assert(table->s->null_bytes == 1);
380   buf[0]= 0;
381 
382   for (; (f= *fields) ; fields++)
383   {
384     if (read_all || bitmap_is_set(table->read_set, f->field_index))
385     {
386       switch(f->field_index)
387       {
388       case 0: /** channel_name*/
389         set_field_char_utf8(f, m_row.channel_name,m_row.channel_name_length);
390         break;
391       case 1: /** group_name */
392         if (m_row.group_name_is_null)
393           f->set_null();
394         else
395           set_field_char_utf8(f, m_row.group_name, UUID_LENGTH);
396         break;
397       case 2: /** source_uuid */
398         if (m_row.source_uuid_is_null)
399           f->set_null();
400         else
401           set_field_char_utf8(f, m_row.source_uuid, UUID_LENGTH);
402         break;
403       case 3: /** thread_id */
404         if(m_row.thread_id_is_null)
405           f->set_null();
406         else
407           set_field_ulonglong(f, m_row.thread_id);
408         break;
409       case 4: /** service_state */
410         set_field_enum(f, m_row.service_state);
411         break;
412       case 5: /** number of heartbeat events received **/
413         set_field_ulonglong(f, m_row.count_received_heartbeats);
414         break;
415       case 6: /** time of receipt of last heartbeat event **/
416         set_field_timestamp(f, m_row.last_heartbeat_timestamp);
417         break;
418       case 7: /** received_transaction_set */
419         set_field_longtext_utf8(f, m_row.received_transaction_set,
420                                 m_row.received_transaction_set_length);
421         break;
422       case 8: /*last_error_number*/
423         set_field_ulong(f, m_row.last_error_number);
424         break;
425       case 9: /*last_error_message*/
426         set_field_varchar_utf8(f, m_row.last_error_message,
427                                m_row.last_error_message_length);
428         break;
429       case 10: /*last_error_timestamp*/
430         set_field_timestamp(f, m_row.last_error_timestamp);
431         break;
432       default:
433         assert(false);
434       }
435     }
436   }
437   m_row.cleanup();
438 
439   return 0;
440 }
441