1 /*
2 Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 /**
25 @file storage/perfschema/table_replication_applier_status_by_worker.cc
26 Table replication_applier_status_by_worker (implementation).
27 */
28
29 #define HAVE_REPLICATION
30
31 #include "my_global.h"
32 #include "table_replication_applier_status_by_worker.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_rli_pdb.h"
41 #include "rpl_msr.h" /*Multi source replication */
42
43 THR_LOCK table_replication_applier_status_by_worker::m_table_lock;
44
45 /* numbers in varchar count utf8 characters. */
46 static const TABLE_FIELD_TYPE field_types[]=
47 {
48
49 {
50 {C_STRING_WITH_LEN("CHANNEL_NAME")},
51 {C_STRING_WITH_LEN("char(64)")},
52 {NULL, 0}
53 },
54 {
55 {C_STRING_WITH_LEN("WORKER_ID")},
56 {C_STRING_WITH_LEN("bigint")},
57 {NULL, 0}
58 },
59 {
60 {C_STRING_WITH_LEN("THREAD_ID")},
61 {C_STRING_WITH_LEN("bigint")},
62 {NULL, 0}
63 },
64 {
65 {C_STRING_WITH_LEN("SERVICE_STATE")},
66 {C_STRING_WITH_LEN("enum('ON','OFF')")},
67 {NULL, 0}
68 },
69 {
70 {C_STRING_WITH_LEN("LAST_SEEN_TRANSACTION")},
71 {C_STRING_WITH_LEN("char(57)")},
72 {NULL, 0}
73 },
74 {
75 {C_STRING_WITH_LEN("LAST_ERROR_NUMBER")},
76 {C_STRING_WITH_LEN("int(11)")},
77 {NULL, 0}
78 },
79 {
80 {C_STRING_WITH_LEN("LAST_ERROR_MESSAGE")},
81 {C_STRING_WITH_LEN("varchar(1024)")},
82 {NULL, 0}
83 },
84 {
85 {C_STRING_WITH_LEN("LAST_ERROR_TIMESTAMP")},
86 {C_STRING_WITH_LEN("timestamp")},
87 {NULL, 0}
88 },
89 };
90
91 TABLE_FIELD_DEF
92 table_replication_applier_status_by_worker::m_field_def=
93 { 8, field_types };
94
95 PFS_engine_table_share
96 table_replication_applier_status_by_worker::m_share=
97 {
98 { C_STRING_WITH_LEN("replication_applier_status_by_worker") },
99 &pfs_readonly_acl,
100 table_replication_applier_status_by_worker::create,
101 NULL, /* write_row */
102 NULL, /* delete_all_rows */
103 table_replication_applier_status_by_worker::get_row_count, /*records*/
104 sizeof(pos_t), /* ref length */
105 &m_table_lock,
106 &m_field_def,
107 false, /* checked */
108 false /* perpetual */
109 };
110
create(void)111 PFS_engine_table* table_replication_applier_status_by_worker::create(void)
112 {
113 return new table_replication_applier_status_by_worker();
114 }
115
116 table_replication_applier_status_by_worker
table_replication_applier_status_by_worker()117 ::table_replication_applier_status_by_worker()
118 : PFS_engine_table(&m_share, &m_pos),
119 m_row_exists(false), m_pos(), m_next_pos()
120 {}
121
122 table_replication_applier_status_by_worker
~table_replication_applier_status_by_worker()123 ::~table_replication_applier_status_by_worker()
124 {}
125
reset_position(void)126 void table_replication_applier_status_by_worker::reset_position(void)
127 {
128 m_pos.reset();
129 m_next_pos.reset();
130 }
131
get_row_count()132 ha_rows table_replication_applier_status_by_worker::get_row_count()
133 {
134 /*
135 Return an estimate, number of master info's multipled by worker threads
136 */
137 return channel_map.get_max_channels()*32;
138 }
139
140
rnd_next(void)141 int table_replication_applier_status_by_worker::rnd_next(void)
142 {
143 Slave_worker *worker;
144 Master_info *mi;
145 size_t wc;
146
147 channel_map.rdlock();
148
149 for (m_pos.set_at(&m_next_pos);
150 m_pos.has_more_channels(channel_map.get_max_channels());
151 m_pos.next_channel())
152 {
153 mi= channel_map.get_mi_at_pos(m_pos.m_index_1);
154
155 if (mi && mi->host[0])
156 {
157 wc= mi->rli->get_worker_count();
158
159 if (wc == 0)
160 {
161 /* Single Thread Slave */
162 make_row(mi);
163 m_next_pos.set_channel_after(&m_pos);
164 channel_map.unlock();
165 return 0;
166 }
167
168 for (; m_pos.m_index_2 < wc; m_pos.next_worker())
169 {
170 /* Multi Thread Slave */
171
172 worker = mi->rli->get_worker(m_pos.m_index_2);
173 if (worker)
174 {
175 make_row(worker);
176 m_next_pos.set_after(&m_pos);
177 channel_map.unlock();
178 return 0;
179 }
180 }
181 }
182 }
183
184 channel_map.unlock();
185 return HA_ERR_END_OF_FILE;
186 }
187
rnd_pos(const void * pos)188 int table_replication_applier_status_by_worker::rnd_pos(const void *pos)
189 {
190 Slave_worker *worker;
191 Master_info *mi;
192 int res= HA_ERR_RECORD_DELETED;
193 size_t wc;
194
195 set_position(pos);
196
197 channel_map.rdlock();
198
199 mi= channel_map.get_mi_at_pos(m_pos.m_index_1);
200
201 if (!mi || !mi->rli || !mi->host[0])
202 goto end;
203
204 wc = mi->rli->get_worker_count();
205
206 if (wc == 0)
207 {
208 /* Single Thread Slave */
209 make_row(mi);
210 res=0;
211 }
212 else
213 {
214 /* Multi Thread Slave */
215 if (m_pos.m_index_2 < wc)
216 {
217 worker = mi->rli->get_worker(m_pos.m_index_2);
218 if (worker != NULL)
219 {
220 make_row(worker);
221 res=0;
222 }
223 }
224 }
225
226 end:
227 channel_map.unlock();
228 return res;
229 }
230
231 /**
232 Function to display SQL Thread's status as part of
233 'replication_applier_status_by_worker' in single threaded slave mode.
234
235 @param[in] Master_info
236
237 @retval void
238 */
make_row(Master_info * mi)239 void table_replication_applier_status_by_worker::make_row(Master_info *mi)
240 {
241 m_row_exists= false;
242
243 m_row.worker_id= 0;
244
245 m_row.thread_id= 0;
246
247 assert(mi != NULL);
248 assert(mi->rli != NULL);
249
250 mysql_mutex_lock(&mi->rli->data_lock);
251
252 m_row.channel_name_length= strlen(mi->get_channel());
253 memcpy(m_row.channel_name, (char*)mi->get_channel(), m_row.channel_name_length);
254
255 if (mi->rli->slave_running)
256 {
257 PSI_thread *psi= thd_get_psi(mi->rli->info_thd);
258 PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
259 if(pfs)
260 {
261 m_row.thread_id= pfs->m_thread_internal_id;
262 m_row.thread_id_is_null= false;
263 }
264 else
265 m_row.thread_id_is_null= true;
266 }
267 else
268 m_row.thread_id_is_null= true;
269
270 if (mi->rli->slave_running)
271 m_row.service_state= PS_RPL_YES;
272 else
273 m_row.service_state= PS_RPL_NO;
274
275 if (mi->rli->currently_executing_gtid.type == GTID_GROUP)
276 {
277 global_sid_lock->rdlock();
278 m_row.last_seen_transaction_length=
279 mi->rli->currently_executing_gtid.to_string(global_sid_map,
280 m_row.last_seen_transaction);
281 global_sid_lock->unlock();
282 }
283 else if (mi->rli->currently_executing_gtid.type == ANONYMOUS_GROUP)
284 {
285 m_row.last_seen_transaction_length=
286 mi->rli->currently_executing_gtid.to_string((rpl_sid *)NULL,
287 m_row.last_seen_transaction);
288 }
289 else
290 {
291 /*
292 For SQL thread currently_executing_gtid, type is set to
293 AUTOMATIC_GROUP when the SQL thread is not executing any
294 transaction. For this case, the field should be empty.
295 */
296 assert(mi->rli->currently_executing_gtid.type == AUTOMATIC_GROUP);
297 m_row.last_seen_transaction_length= 0;
298 memcpy(m_row.last_seen_transaction, "", 1);
299 }
300
301 mysql_mutex_lock(&mi->rli->err_lock);
302
303 m_row.last_error_number= (long int) mi->rli->last_error().number;
304 m_row.last_error_message_length= 0;
305 m_row.last_error_timestamp= 0;
306
307 /** if error, set error message and timestamp */
308 if (m_row.last_error_number)
309 {
310 char *temp_store= (char*) mi->rli->last_error().message;
311 m_row.last_error_message_length= strlen(temp_store);
312 memcpy(m_row.last_error_message, temp_store,
313 m_row.last_error_message_length);
314
315 /** time in millisecond since epoch */
316 m_row.last_error_timestamp= (ulonglong)mi->rli->last_error().skr*1000000;
317 }
318
319 mysql_mutex_unlock(&mi->rli->err_lock);
320 mysql_mutex_unlock(&mi->rli->data_lock);
321 m_row_exists= true;
322 }
323
make_row(Slave_worker * w)324 void table_replication_applier_status_by_worker::make_row(Slave_worker *w)
325 {
326 m_row_exists= false;
327
328 m_row.worker_id= w->get_internal_id();
329
330 m_row.thread_id= 0;
331
332 m_row.channel_name_length= strlen(w->get_channel());
333 memcpy(m_row.channel_name, (char*)w->get_channel(), m_row.channel_name_length);
334
335 mysql_mutex_lock(&w->jobs_lock);
336 if (w->running_status == Slave_worker::RUNNING)
337 {
338 PSI_thread *psi= thd_get_psi(w->info_thd);
339 PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
340 if(pfs)
341 {
342 m_row.thread_id= pfs->m_thread_internal_id;
343 m_row.thread_id_is_null= false;
344 }
345 else /* no instrumentation found */
346 m_row.thread_id_is_null= true;
347 }
348 else
349 m_row.thread_id_is_null= true;
350
351 if (w->running_status == Slave_worker::RUNNING)
352 m_row.service_state= PS_RPL_YES;
353 else
354 m_row.service_state= PS_RPL_NO;
355
356 m_row.last_error_number= (unsigned int) w->last_error().number;
357
358 if (w->currently_executing_gtid.type == GTID_GROUP)
359 {
360 global_sid_lock->rdlock();
361 m_row.last_seen_transaction_length=
362 w->currently_executing_gtid.to_string(global_sid_map,
363 m_row.last_seen_transaction);
364 global_sid_lock->unlock();
365 }
366 else if (w->currently_executing_gtid.type == ANONYMOUS_GROUP)
367 {
368 m_row.last_seen_transaction_length=
369 w->currently_executing_gtid.to_string((rpl_sid *)NULL,
370 m_row.last_seen_transaction);
371 }
372 else
373 {
374 /*
375 For worker->currently_executing_gtid, type is set to
376 AUTOMATIC_GROUP when the worker is not executing any
377 transaction. For this case, the field should be empty.
378 */
379 assert(w->currently_executing_gtid.type == AUTOMATIC_GROUP);
380 m_row.last_seen_transaction_length= 0;
381 memcpy(m_row.last_seen_transaction, "", 1);
382 }
383
384 m_row.last_error_number= (unsigned int) w->last_error().number;
385 m_row.last_error_message_length= 0;
386 m_row.last_error_timestamp= 0;
387
388 /** if error, set error message and timestamp */
389 if (m_row.last_error_number)
390 {
391 char * temp_store= (char*)w->last_error().message;
392 m_row.last_error_message_length= strlen(temp_store);
393 memcpy(m_row.last_error_message, w->last_error().message,
394 m_row.last_error_message_length);
395
396 /** time in millisecond since epoch */
397 m_row.last_error_timestamp= (ulonglong)w->last_error().skr*1000000;
398 }
399 mysql_mutex_unlock(&w->jobs_lock);
400
401 m_row_exists= true;
402 }
403
404 int table_replication_applier_status_by_worker
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)405 ::read_row_values(TABLE *table, unsigned char *buf, Field **fields,
406 bool read_all)
407 {
408 Field *f;
409
410 if (unlikely(! m_row_exists))
411 return HA_ERR_RECORD_DELETED;
412
413 assert(table->s->null_bytes == 1);
414 buf[0]= 0;
415
416 for (; (f= *fields) ; fields++)
417 {
418 if (read_all || bitmap_is_set(table->read_set, f->field_index))
419 {
420 switch(f->field_index)
421 {
422 case 0: /** channel_name */
423 set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
424 break;
425 case 1: /*worker_id*/
426 set_field_ulonglong(f, m_row.worker_id);
427 break;
428 case 2: /*thread_id*/
429 if(m_row.thread_id_is_null)
430 f->set_null();
431 else
432 set_field_ulonglong(f, m_row.thread_id);
433 break;
434 case 3: /*service_state*/
435 set_field_enum(f, m_row.service_state);
436 break;
437 case 4: /*last_seen_transaction*/
438 set_field_char_utf8(f, m_row.last_seen_transaction, m_row.last_seen_transaction_length);
439 break;
440 case 5: /*last_error_number*/
441 set_field_ulong(f, m_row.last_error_number);
442 break;
443 case 6: /*last_error_message*/
444 set_field_varchar_utf8(f, m_row.last_error_message, m_row.last_error_message_length);
445 break;
446 case 7: /*last_error_timestamp*/
447 set_field_timestamp(f, m_row.last_error_timestamp);
448 break;
449 default:
450 assert(false);
451 }
452 }
453 }
454 return 0;
455 }
456