1 /*
2       Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3 
4       This program is free software; you can redistribute it and/or modify
5       it under the terms of the GNU General Public License, version 2.0,
6       as published by the Free Software Foundation.
7 
8       This program is also distributed with certain software (including
9       but not limited to OpenSSL) that is licensed under separate terms,
10       as designated in a particular file or component or in included license
11       documentation.  The authors of MySQL hereby grant you an additional
12       permission to link the program and your derivative works with the
13       separately licensed software that they have included with MySQL.
14 
15       This program is distributed in the hope that it will be useful,
16       but WITHOUT ANY WARRANTY; without even the implied warranty of
17       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18       GNU General Public License, version 2.0, for more details.
19 
20       You should have received a copy of the GNU General Public License
21       along with this program; if not, write to the Free Software
22       Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 /**
25   @file storage/perfschema/table_replication_applier_status_by_cordinator.cc
26   Table replication_applier_status_by_coordinator (implementation).
27 */
28 
29 #define HAVE_REPLICATION
30 
31 #include "my_global.h"
32 #include "table_replication_applier_status_by_coordinator.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include  "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h"       /* Multisource replication */
41 
42 THR_LOCK table_replication_applier_status_by_coordinator::m_table_lock;
43 
44 /*
45   numbers in varchar count utf8 characters.
46 */
47 static const TABLE_FIELD_TYPE field_types[]=
48 {
49 
50   {
51     {C_STRING_WITH_LEN("CHANNEL_NAME")},
52     {C_STRING_WITH_LEN("char(64)")},
53     {NULL, 0}
54   },
55   {
56     {C_STRING_WITH_LEN("THREAD_ID")},
57     {C_STRING_WITH_LEN("bigint")},
58     {NULL, 0}
59   },
60   {
61     {C_STRING_WITH_LEN("SERVICE_STATE")},
62     {C_STRING_WITH_LEN("enum('ON','OFF')")},
63     {NULL, 0}
64   },
65   {
66     {C_STRING_WITH_LEN("LAST_ERROR_NUMBER")},
67     {C_STRING_WITH_LEN("int(11)")},
68     {NULL, 0}
69   },
70   {
71     {C_STRING_WITH_LEN("LAST_ERROR_MESSAGE")},
72     {C_STRING_WITH_LEN("varchar(1024)")},
73     {NULL, 0}
74   },
75   {
76     {C_STRING_WITH_LEN("LAST_ERROR_TIMESTAMP")},
77     { C_STRING_WITH_LEN("timestamp") },
78     { NULL, 0}
79   },
80 };
81 
82 TABLE_FIELD_DEF
83 table_replication_applier_status_by_coordinator::m_field_def=
84 { 6, field_types };
85 
86 PFS_engine_table_share
87 table_replication_applier_status_by_coordinator::m_share=
88 {
89   { C_STRING_WITH_LEN("replication_applier_status_by_coordinator") },
90   &pfs_readonly_acl,
91   table_replication_applier_status_by_coordinator::create,
92   NULL, /* write_row */
93   NULL, /* delete_all_rows */
94   table_replication_applier_status_by_coordinator::get_row_count,
95   sizeof(pos_t), /* ref length */
96   &m_table_lock,
97   &m_field_def,
98   false, /* checked */
99   false  /* perpetual */
100 };
101 
create(void)102 PFS_engine_table* table_replication_applier_status_by_coordinator::create(void)
103 {
104   return new table_replication_applier_status_by_coordinator();
105 }
106 
107 table_replication_applier_status_by_coordinator
table_replication_applier_status_by_coordinator()108   ::table_replication_applier_status_by_coordinator()
109   : PFS_engine_table(&m_share, &m_pos),
110     m_row_exists(false), m_pos(0), m_next_pos(0)
111 {}
112 
113 table_replication_applier_status_by_coordinator
~table_replication_applier_status_by_coordinator()114   ::~table_replication_applier_status_by_coordinator()
115 {}
116 
reset_position(void)117 void table_replication_applier_status_by_coordinator::reset_position(void)
118 {
119   m_pos.m_index= 0;
120   m_next_pos.m_index= 0;
121 }
122 
get_row_count()123 ha_rows table_replication_applier_status_by_coordinator::get_row_count()
124 {
125  return channel_map.get_max_channels();
126 }
127 
128 
rnd_next(void)129 int table_replication_applier_status_by_coordinator::rnd_next(void)
130 {
131   Master_info *mi;
132   channel_map.rdlock();
133 
134   for(m_pos.set_at(&m_next_pos);
135       m_pos.m_index < channel_map.get_max_channels();
136       m_pos.next())
137   {
138     mi= channel_map.get_mi_at_pos(m_pos.m_index);
139 
140     /*
141       Construct and display SQL Thread's (Coordinator) information in
142       'replication_applier_status_by_coordinator' table only in the case of
143       multi threaded slave mode. Code should do nothing in the case of single
144       threaded slave mode. In case of single threaded slave mode SQL Thread's
145       status will be reported as part of
146       'replication_applier_status_by_worker' table.
147     */
148     if (mi && mi->host[0] && mi->rli && mi->rli->get_worker_count() > 0)
149     {
150       make_row(mi);
151       m_next_pos.set_after(&m_pos);
152       channel_map.unlock();
153       return 0;
154     }
155   }
156 
157   channel_map.unlock();
158   return HA_ERR_END_OF_FILE;
159 }
160 
rnd_pos(const void * pos)161 int table_replication_applier_status_by_coordinator::rnd_pos(const void *pos)
162 {
163   Master_info *mi=NULL;
164   int res= HA_ERR_RECORD_DELETED;
165 
166   set_position(pos);
167 
168   channel_map.rdlock();
169 
170   if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
171   {
172     make_row(mi);
173     res= 0;
174   }
175 
176   channel_map.unlock();
177   return res;
178 }
179 
make_row(Master_info * mi)180 void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
181 {
182   m_row_exists= false;
183 
184   assert(mi != NULL);
185   assert(mi->rli != NULL);
186 
187   mysql_mutex_lock(&mi->rli->data_lock);
188 
189   m_row.channel_name_length= strlen(mi->get_channel());
190   memcpy(m_row.channel_name, (char*)mi->get_channel(), m_row.channel_name_length);
191 
192   if (mi->rli->slave_running)
193   {
194     PSI_thread *psi= thd_get_psi(mi->rli->info_thd);
195     PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
196     if(pfs)
197     {
198       m_row.thread_id= pfs->m_thread_internal_id;
199       m_row.thread_id_is_null= false;
200     }
201     else
202       m_row.thread_id_is_null= true;
203   }
204   else
205     m_row.thread_id_is_null= true;
206 
207   if (mi->rli->slave_running)
208     m_row.service_state= PS_RPL_YES;
209   else
210     m_row.service_state= PS_RPL_NO;
211 
212   mysql_mutex_lock(&mi->rli->err_lock);
213 
214   m_row.last_error_number= (long int) mi->rli->last_error().number;
215   m_row.last_error_message_length= 0;
216   m_row.last_error_timestamp= 0;
217 
218   /** if error, set error message and timestamp */
219   if (m_row.last_error_number)
220   {
221     char *temp_store= (char*) mi->rli->last_error().message;
222     m_row.last_error_message_length= strlen(temp_store);
223     memcpy(m_row.last_error_message, temp_store,
224            m_row.last_error_message_length);
225 
226     /** time in millisecond since epoch */
227     m_row.last_error_timestamp= (ulonglong)mi->rli->last_error().skr*1000000;
228   }
229 
230   mysql_mutex_unlock(&mi->rli->err_lock);
231   mysql_mutex_unlock(&mi->rli->data_lock);
232 
233   m_row_exists= true;
234 }
235 
236 int table_replication_applier_status_by_coordinator
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)237   ::read_row_values(TABLE *table, unsigned char *buf,
238                     Field **fields, bool read_all)
239 {
240   Field *f;
241 
242   if (unlikely(! m_row_exists))
243     return HA_ERR_RECORD_DELETED;
244 
245   assert(table->s->null_bytes == 1);
246   buf[0]= 0;
247 
248   for (; (f= *fields) ; fields++)
249   {
250     if (read_all || bitmap_is_set(table->read_set, f->field_index))
251     {
252       switch(f->field_index)
253       {
254       case 0: /* channel_name */
255          set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
256          break;
257       case 1: /*thread_id*/
258         if (!m_row.thread_id_is_null)
259           set_field_ulonglong(f, m_row.thread_id);
260         else
261           f->set_null();
262         break;
263       case 2: /*service_state*/
264         set_field_enum(f, m_row.service_state);
265         break;
266       case 3: /*last_error_number*/
267         set_field_ulong(f, m_row.last_error_number);
268         break;
269       case 4: /*last_error_message*/
270         set_field_varchar_utf8(f, m_row.last_error_message,
271                                m_row.last_error_message_length);
272         break;
273       case 5: /*last_error_timestamp*/
274         set_field_timestamp(f, m_row.last_error_timestamp);
275         break;
276       default:
277         assert(false);
278       }
279     }
280   }
281   return 0;
282 }
283