1 /*
2       Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3 
4       This program is free software; you can redistribute it and/or modify
5       it under the terms of the GNU General Public License, version 2.0,
6       as published by the Free Software Foundation.
7 
8       This program is also distributed with certain software (including
9       but not limited to OpenSSL) that is licensed under separate terms,
10       as designated in a particular file or component or in included license
11       documentation.  The authors of MySQL hereby grant you an additional
12       permission to link the program and your derivative works with the
13       separately licensed software that they have included with MySQL.
14 
15       This program is distributed in the hope that it will be useful,
16       but WITHOUT ANY WARRANTY; without even the implied warranty of
17       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18       GNU General Public License, version 2.0, for more details.
19 
20       You should have received a copy of the GNU General Public License
21       along with this program; if not, write to the Free Software
22       Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 /**
25   @file storage/perfschema/table_replication_applier_status_by_worker.cc
26   Table replication_applier_status_by_worker (implementation).
27 */
28 
29 #define HAVE_REPLICATION
30 
31 #include "my_global.h"
32 #include "table_replication_applier_status_by_worker.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include  "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_rli_pdb.h"
41 #include "rpl_msr.h"    /*Multi source replication */
42 
43 THR_LOCK table_replication_applier_status_by_worker::m_table_lock;
44 
45 /* numbers in varchar count utf8 characters. */
46 static const TABLE_FIELD_TYPE field_types[]=
47 {
48 
49   {
50     {C_STRING_WITH_LEN("CHANNEL_NAME")},
51     {C_STRING_WITH_LEN("char(64)")},
52     {NULL, 0}
53   },
54   {
55     {C_STRING_WITH_LEN("WORKER_ID")},
56     {C_STRING_WITH_LEN("bigint")},
57     {NULL, 0}
58   },
59   {
60     {C_STRING_WITH_LEN("THREAD_ID")},
61     {C_STRING_WITH_LEN("bigint")},
62     {NULL, 0}
63   },
64   {
65     {C_STRING_WITH_LEN("SERVICE_STATE")},
66     {C_STRING_WITH_LEN("enum('ON','OFF')")},
67     {NULL, 0}
68   },
69   {
70     {C_STRING_WITH_LEN("LAST_SEEN_TRANSACTION")},
71     {C_STRING_WITH_LEN("char(57)")},
72     {NULL, 0}
73   },
74   {
75     {C_STRING_WITH_LEN("LAST_ERROR_NUMBER")},
76     {C_STRING_WITH_LEN("int(11)")},
77     {NULL, 0}
78   },
79   {
80     {C_STRING_WITH_LEN("LAST_ERROR_MESSAGE")},
81     {C_STRING_WITH_LEN("varchar(1024)")},
82     {NULL, 0}
83   },
84   {
85     {C_STRING_WITH_LEN("LAST_ERROR_TIMESTAMP")},
86     {C_STRING_WITH_LEN("timestamp")},
87     {NULL, 0}
88   },
89 };
90 
91 TABLE_FIELD_DEF
92 table_replication_applier_status_by_worker::m_field_def=
93 { 8, field_types };
94 
95 PFS_engine_table_share
96 table_replication_applier_status_by_worker::m_share=
97 {
98   { C_STRING_WITH_LEN("replication_applier_status_by_worker") },
99   &pfs_readonly_acl,
100   table_replication_applier_status_by_worker::create,
101   NULL, /* write_row */
102   NULL, /* delete_all_rows */
103   table_replication_applier_status_by_worker::get_row_count, /*records*/
104   sizeof(pos_t), /* ref length */
105   &m_table_lock,
106   &m_field_def,
107   false, /* checked */
108   false  /* perpetual */
109 };
110 
create(void)111 PFS_engine_table* table_replication_applier_status_by_worker::create(void)
112 {
113   return new table_replication_applier_status_by_worker();
114 }
115 
116 table_replication_applier_status_by_worker
table_replication_applier_status_by_worker()117   ::table_replication_applier_status_by_worker()
118   : PFS_engine_table(&m_share, &m_pos),
119     m_row_exists(false), m_pos(), m_next_pos()
120 {}
121 
122 table_replication_applier_status_by_worker
~table_replication_applier_status_by_worker()123   ::~table_replication_applier_status_by_worker()
124 {}
125 
reset_position(void)126 void table_replication_applier_status_by_worker::reset_position(void)
127 {
128   m_pos.reset();
129   m_next_pos.reset();
130 }
131 
get_row_count()132 ha_rows table_replication_applier_status_by_worker::get_row_count()
133 {
134   /*
135     Return an estimate, number of master info's multipled by worker threads
136   */
137  return channel_map.get_max_channels()*32;
138 }
139 
140 
rnd_next(void)141 int table_replication_applier_status_by_worker::rnd_next(void)
142 {
143   Slave_worker *worker;
144   Master_info *mi;
145   size_t wc;
146 
147   channel_map.rdlock();
148 
149   for (m_pos.set_at(&m_next_pos);
150        m_pos.has_more_channels(channel_map.get_max_channels());
151        m_pos.next_channel())
152   {
153     mi= channel_map.get_mi_at_pos(m_pos.m_index_1);
154 
155     if (mi && mi->host[0])
156     {
157       wc= mi->rli->get_worker_count();
158 
159       if (wc == 0)
160       {
161         /* Single Thread Slave */
162         make_row(mi);
163         m_next_pos.set_channel_after(&m_pos);
164         channel_map.unlock();
165         return 0;
166       }
167 
168       for (; m_pos.m_index_2 < wc; m_pos.next_worker())
169       {
170         /* Multi Thread Slave */
171 
172         worker = mi->rli->get_worker(m_pos.m_index_2);
173         if (worker)
174         {
175           make_row(worker);
176           m_next_pos.set_after(&m_pos);
177           channel_map.unlock();
178           return 0;
179         }
180       }
181     }
182   }
183 
184   channel_map.unlock();
185   return HA_ERR_END_OF_FILE;
186 }
187 
rnd_pos(const void * pos)188 int table_replication_applier_status_by_worker::rnd_pos(const void *pos)
189 {
190   Slave_worker *worker;
191   Master_info *mi;
192   int res= HA_ERR_RECORD_DELETED;
193   size_t wc;
194 
195   set_position(pos);
196 
197   channel_map.rdlock();
198 
199   mi= channel_map.get_mi_at_pos(m_pos.m_index_1);
200 
201   if (!mi || !mi->rli || !mi->host[0])
202     goto end;
203 
204   wc = mi->rli->get_worker_count();
205 
206   if (wc == 0)
207   {
208     /* Single Thread Slave */
209     make_row(mi);
210     res=0;
211   }
212   else
213   {
214     /* Multi Thread Slave */
215     if (m_pos.m_index_2 < wc)
216     {
217       worker = mi->rli->get_worker(m_pos.m_index_2);
218       if (worker != NULL)
219       {
220         make_row(worker);
221         res=0;
222       }
223     }
224   }
225 
226 end:
227   channel_map.unlock();
228   return res;
229 }
230 
231 /**
232    Function to display SQL Thread's status as part of
233    'replication_applier_status_by_worker' in single threaded slave mode.
234 
235    @param[in] Master_info
236 
237    @retval void
238 */
make_row(Master_info * mi)239 void table_replication_applier_status_by_worker::make_row(Master_info *mi)
240 {
241   m_row_exists= false;
242 
243   m_row.worker_id= 0;
244 
245   m_row.thread_id= 0;
246 
247   assert(mi != NULL);
248   assert(mi->rli != NULL);
249 
250   mysql_mutex_lock(&mi->rli->data_lock);
251 
252   m_row.channel_name_length= strlen(mi->get_channel());
253   memcpy(m_row.channel_name, (char*)mi->get_channel(), m_row.channel_name_length);
254 
255   if (mi->rli->slave_running)
256   {
257     PSI_thread *psi= thd_get_psi(mi->rli->info_thd);
258     PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
259     if(pfs)
260     {
261       m_row.thread_id= pfs->m_thread_internal_id;
262       m_row.thread_id_is_null= false;
263     }
264     else
265       m_row.thread_id_is_null= true;
266   }
267   else
268     m_row.thread_id_is_null= true;
269 
270   if (mi->rli->slave_running)
271     m_row.service_state= PS_RPL_YES;
272   else
273     m_row.service_state= PS_RPL_NO;
274 
275   if (mi->rli->currently_executing_gtid.type == GTID_GROUP)
276   {
277     global_sid_lock->rdlock();
278     m_row.last_seen_transaction_length=
279       mi->rli->currently_executing_gtid.to_string(global_sid_map,
280                                             m_row.last_seen_transaction);
281     global_sid_lock->unlock();
282   }
283   else if (mi->rli->currently_executing_gtid.type == ANONYMOUS_GROUP)
284   {
285     m_row.last_seen_transaction_length=
286       mi->rli->currently_executing_gtid.to_string((rpl_sid *)NULL,
287                                             m_row.last_seen_transaction);
288   }
289   else
290   {
291     /*
292       For SQL thread currently_executing_gtid, type is set to
293       AUTOMATIC_GROUP when the SQL thread is not executing any
294       transaction.  For this case, the field should be empty.
295     */
296     assert(mi->rli->currently_executing_gtid.type == AUTOMATIC_GROUP);
297     m_row.last_seen_transaction_length= 0;
298     memcpy(m_row.last_seen_transaction, "", 1);
299   }
300 
301   mysql_mutex_lock(&mi->rli->err_lock);
302 
303   m_row.last_error_number= (long int) mi->rli->last_error().number;
304   m_row.last_error_message_length= 0;
305   m_row.last_error_timestamp= 0;
306 
307   /** if error, set error message and timestamp */
308   if (m_row.last_error_number)
309   {
310     char *temp_store= (char*) mi->rli->last_error().message;
311     m_row.last_error_message_length= strlen(temp_store);
312     memcpy(m_row.last_error_message, temp_store,
313            m_row.last_error_message_length);
314 
315     /** time in millisecond since epoch */
316     m_row.last_error_timestamp= (ulonglong)mi->rli->last_error().skr*1000000;
317   }
318 
319   mysql_mutex_unlock(&mi->rli->err_lock);
320   mysql_mutex_unlock(&mi->rli->data_lock);
321   m_row_exists= true;
322 }
323 
make_row(Slave_worker * w)324 void table_replication_applier_status_by_worker::make_row(Slave_worker *w)
325 {
326   m_row_exists= false;
327 
328   m_row.worker_id= w->get_internal_id();
329 
330   m_row.thread_id= 0;
331 
332   m_row.channel_name_length= strlen(w->get_channel());
333   memcpy(m_row.channel_name, (char*)w->get_channel(), m_row.channel_name_length);
334 
335   mysql_mutex_lock(&w->jobs_lock);
336   if (w->running_status == Slave_worker::RUNNING)
337   {
338     PSI_thread *psi= thd_get_psi(w->info_thd);
339     PFS_thread *pfs= reinterpret_cast<PFS_thread *> (psi);
340     if(pfs)
341     {
342       m_row.thread_id= pfs->m_thread_internal_id;
343       m_row.thread_id_is_null= false;
344     }
345     else /* no instrumentation found */
346       m_row.thread_id_is_null= true;
347   }
348   else
349     m_row.thread_id_is_null= true;
350 
351   if (w->running_status == Slave_worker::RUNNING)
352     m_row.service_state= PS_RPL_YES;
353   else
354     m_row.service_state= PS_RPL_NO;
355 
356   m_row.last_error_number= (unsigned int) w->last_error().number;
357 
358   if (w->currently_executing_gtid.type == GTID_GROUP)
359   {
360     global_sid_lock->rdlock();
361     m_row.last_seen_transaction_length=
362       w->currently_executing_gtid.to_string(global_sid_map,
363                                             m_row.last_seen_transaction);
364     global_sid_lock->unlock();
365   }
366   else if (w->currently_executing_gtid.type == ANONYMOUS_GROUP)
367   {
368     m_row.last_seen_transaction_length=
369       w->currently_executing_gtid.to_string((rpl_sid *)NULL,
370                                             m_row.last_seen_transaction);
371   }
372   else
373   {
374     /*
375       For worker->currently_executing_gtid, type is set to
376       AUTOMATIC_GROUP when the worker is not executing any
377       transaction.  For this case, the field should be empty.
378     */
379     assert(w->currently_executing_gtid.type == AUTOMATIC_GROUP);
380     m_row.last_seen_transaction_length= 0;
381     memcpy(m_row.last_seen_transaction, "", 1);
382   }
383 
384   m_row.last_error_number= (unsigned int) w->last_error().number;
385   m_row.last_error_message_length= 0;
386   m_row.last_error_timestamp= 0;
387 
388   /** if error, set error message and timestamp */
389   if (m_row.last_error_number)
390   {
391     char * temp_store= (char*)w->last_error().message;
392     m_row.last_error_message_length= strlen(temp_store);
393     memcpy(m_row.last_error_message, w->last_error().message,
394            m_row.last_error_message_length);
395 
396     /** time in millisecond since epoch */
397     m_row.last_error_timestamp= (ulonglong)w->last_error().skr*1000000;
398   }
399   mysql_mutex_unlock(&w->jobs_lock);
400 
401   m_row_exists= true;
402 }
403 
404 int table_replication_applier_status_by_worker
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)405   ::read_row_values(TABLE *table, unsigned char *buf,  Field **fields,
406                     bool read_all)
407 {
408   Field *f;
409 
410   if (unlikely(! m_row_exists))
411     return HA_ERR_RECORD_DELETED;
412 
413   assert(table->s->null_bytes == 1);
414   buf[0]= 0;
415 
416   for (; (f= *fields) ; fields++)
417   {
418     if (read_all || bitmap_is_set(table->read_set, f->field_index))
419     {
420       switch(f->field_index)
421       {
422       case 0: /** channel_name */
423         set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
424         break;
425       case 1: /*worker_id*/
426         set_field_ulonglong(f, m_row.worker_id);
427         break;
428       case 2: /*thread_id*/
429         if(m_row.thread_id_is_null)
430           f->set_null();
431         else
432           set_field_ulonglong(f, m_row.thread_id);
433         break;
434       case 3: /*service_state*/
435         set_field_enum(f, m_row.service_state);
436         break;
437       case 4: /*last_seen_transaction*/
438         set_field_char_utf8(f, m_row.last_seen_transaction, m_row.last_seen_transaction_length);
439         break;
440       case 5: /*last_error_number*/
441         set_field_ulong(f, m_row.last_error_number);
442         break;
443       case 6: /*last_error_message*/
444         set_field_varchar_utf8(f, m_row.last_error_message, m_row.last_error_message_length);
445         break;
446       case 7: /*last_error_timestamp*/
447         set_field_timestamp(f, m_row.last_error_timestamp);
448         break;
449       default:
450         assert(false);
451       }
452     }
453   }
454   return 0;
455 }
456