1 /*
2 Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 /**
25 @file storage/perfschema/table_replication_applier_status_by_cordinator.cc
26 Table replication_applier_status_by_coordinator (implementation).
27 */
28
29 #define HAVE_REPLICATION
30
31 #include "my_global.h"
32 #include "table_replication_applier_status_by_coordinator.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h" /* Multisource replication */
41
42 THR_LOCK table_replication_applier_status_by_coordinator::m_table_lock;
43
44 /*
45 numbers in varchar count utf8 characters.
46 */
47 static const TABLE_FIELD_TYPE field_types[]=
48 {
49
50 {
51 {C_STRING_WITH_LEN("CHANNEL_NAME")},
52 {C_STRING_WITH_LEN("char(64)")},
53 {NULL, 0}
54 },
55 {
56 {C_STRING_WITH_LEN("THREAD_ID")},
57 {C_STRING_WITH_LEN("bigint")},
58 {NULL, 0}
59 },
60 {
61 {C_STRING_WITH_LEN("SERVICE_STATE")},
62 {C_STRING_WITH_LEN("enum('ON','OFF')")},
63 {NULL, 0}
64 },
65 {
66 {C_STRING_WITH_LEN("LAST_ERROR_NUMBER")},
67 {C_STRING_WITH_LEN("int(11)")},
68 {NULL, 0}
69 },
70 {
71 {C_STRING_WITH_LEN("LAST_ERROR_MESSAGE")},
72 {C_STRING_WITH_LEN("varchar(1024)")},
73 {NULL, 0}
74 },
75 {
76 {C_STRING_WITH_LEN("LAST_ERROR_TIMESTAMP")},
77 { C_STRING_WITH_LEN("timestamp") },
78 { NULL, 0}
79 },
80 };
81
82 TABLE_FIELD_DEF
83 table_replication_applier_status_by_coordinator::m_field_def=
84 { 6, field_types };
85
86 PFS_engine_table_share
87 table_replication_applier_status_by_coordinator::m_share=
88 {
89 { C_STRING_WITH_LEN("replication_applier_status_by_coordinator") },
90 &pfs_readonly_acl,
91 table_replication_applier_status_by_coordinator::create,
92 NULL, /* write_row */
93 NULL, /* delete_all_rows */
94 table_replication_applier_status_by_coordinator::get_row_count,
95 sizeof(pos_t), /* ref length */
96 &m_table_lock,
97 &m_field_def,
98 false, /* checked */
99 false /* perpetual */
100 };
101
create(void)102 PFS_engine_table* table_replication_applier_status_by_coordinator::create(void)
103 {
104 return new table_replication_applier_status_by_coordinator();
105 }
106
107 table_replication_applier_status_by_coordinator
table_replication_applier_status_by_coordinator()108 ::table_replication_applier_status_by_coordinator()
109 : PFS_engine_table(&m_share, &m_pos),
110 m_row_exists(false), m_pos(0), m_next_pos(0)
111 {}
112
113 table_replication_applier_status_by_coordinator
~table_replication_applier_status_by_coordinator()114 ::~table_replication_applier_status_by_coordinator()
115 {}
116
reset_position(void)117 void table_replication_applier_status_by_coordinator::reset_position(void)
118 {
119 m_pos.m_index= 0;
120 m_next_pos.m_index= 0;
121 }
122
get_row_count()123 ha_rows table_replication_applier_status_by_coordinator::get_row_count()
124 {
125 return channel_map.get_max_channels();
126 }
127
128
rnd_next(void)129 int table_replication_applier_status_by_coordinator::rnd_next(void)
130 {
131 Master_info *mi;
132 channel_map.rdlock();
133
134 for(m_pos.set_at(&m_next_pos);
135 m_pos.m_index < channel_map.get_max_channels();
136 m_pos.next())
137 {
138 mi= channel_map.get_mi_at_pos(m_pos.m_index);
139
140 /*
141 Construct and display SQL Thread's (Coordinator) information in
142 'replication_applier_status_by_coordinator' table only in the case of
143 multi threaded slave mode. Code should do nothing in the case of single
144 threaded slave mode. In case of single threaded slave mode SQL Thread's
145 status will be reported as part of
146 'replication_applier_status_by_worker' table.
147 */
148 if (mi && mi->host[0] && mi->rli && mi->rli->get_worker_count() > 0)
149 {
150 make_row(mi);
151 m_next_pos.set_after(&m_pos);
152 channel_map.unlock();
153 return 0;
154 }
155 }
156
157 channel_map.unlock();
158 return HA_ERR_END_OF_FILE;
159 }
160
rnd_pos(const void * pos)161 int table_replication_applier_status_by_coordinator::rnd_pos(const void *pos)
162 {
163 Master_info *mi=NULL;
164 int res= HA_ERR_RECORD_DELETED;
165
166 set_position(pos);
167
168 channel_map.rdlock();
169
170 if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
171 {
172 make_row(mi);
173 res= 0;
174 }
175
176 channel_map.unlock();
177 return res;
178 }
179
make_row(Master_info * mi)180 void table_replication_applier_status_by_coordinator::make_row(Master_info *mi)
181 {
182 m_row_exists= false;
183
184 assert(mi != NULL);
185 assert(mi->rli != NULL);
186
187 mysql_mutex_lock(&mi->rli->data_lock);
188
189 m_row.channel_name_length= strlen(mi->get_channel());
190 memcpy(m_row.channel_name, (char*)mi->get_channel(), m_row.channel_name_length);
191
192 if (mi->rli->slave_running)
193 {
194 PSI_thread *psi= thd_get_psi(mi->rli->info_thd);
195 PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
196 if(pfs)
197 {
198 m_row.thread_id= pfs->m_thread_internal_id;
199 m_row.thread_id_is_null= false;
200 }
201 else
202 m_row.thread_id_is_null= true;
203 }
204 else
205 m_row.thread_id_is_null= true;
206
207 if (mi->rli->slave_running)
208 m_row.service_state= PS_RPL_YES;
209 else
210 m_row.service_state= PS_RPL_NO;
211
212 mysql_mutex_lock(&mi->rli->err_lock);
213
214 m_row.last_error_number= (long int) mi->rli->last_error().number;
215 m_row.last_error_message_length= 0;
216 m_row.last_error_timestamp= 0;
217
218 /** if error, set error message and timestamp */
219 if (m_row.last_error_number)
220 {
221 char *temp_store= (char*) mi->rli->last_error().message;
222 m_row.last_error_message_length= strlen(temp_store);
223 memcpy(m_row.last_error_message, temp_store,
224 m_row.last_error_message_length);
225
226 /** time in millisecond since epoch */
227 m_row.last_error_timestamp= (ulonglong)mi->rli->last_error().skr*1000000;
228 }
229
230 mysql_mutex_unlock(&mi->rli->err_lock);
231 mysql_mutex_unlock(&mi->rli->data_lock);
232
233 m_row_exists= true;
234 }
235
236 int table_replication_applier_status_by_coordinator
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)237 ::read_row_values(TABLE *table, unsigned char *buf,
238 Field **fields, bool read_all)
239 {
240 Field *f;
241
242 if (unlikely(! m_row_exists))
243 return HA_ERR_RECORD_DELETED;
244
245 assert(table->s->null_bytes == 1);
246 buf[0]= 0;
247
248 for (; (f= *fields) ; fields++)
249 {
250 if (read_all || bitmap_is_set(table->read_set, f->field_index))
251 {
252 switch(f->field_index)
253 {
254 case 0: /* channel_name */
255 set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
256 break;
257 case 1: /*thread_id*/
258 if (!m_row.thread_id_is_null)
259 set_field_ulonglong(f, m_row.thread_id);
260 else
261 f->set_null();
262 break;
263 case 2: /*service_state*/
264 set_field_enum(f, m_row.service_state);
265 break;
266 case 3: /*last_error_number*/
267 set_field_ulong(f, m_row.last_error_number);
268 break;
269 case 4: /*last_error_message*/
270 set_field_varchar_utf8(f, m_row.last_error_message,
271 m_row.last_error_message_length);
272 break;
273 case 5: /*last_error_timestamp*/
274 set_field_timestamp(f, m_row.last_error_timestamp);
275 break;
276 default:
277 assert(false);
278 }
279 }
280 }
281 return 0;
282 }
283