1 /*
2       Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3 
4       This program is free software; you can redistribute it and/or modify
5       it under the terms of the GNU General Public License, version 2.0,
6       as published by the Free Software Foundation.
7 
8       This program is also distributed with certain software (including
9       but not limited to OpenSSL) that is licensed under separate terms,
10       as designated in a particular file or component or in included license
11       documentation.  The authors of MySQL hereby grant you an additional
12       permission to link the program and your derivative works with the
13       separately licensed software that they have included with MySQL.
14 
15       This program is distributed in the hope that it will be useful,
16       but WITHOUT ANY WARRANTY; without even the implied warranty of
17       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18       GNU General Public License, version 2.0, for more details.
19 
20       You should have received a copy of the GNU General Public License
21       along with this program; if not, write to the Free Software
22       Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
23 
24 /**
25   @file storage/perfschema/table_replication_applier_status.cc
26   Table replication_applier_status (implementation).
27 */
28 
29 #define HAVE_REPLICATION
30 
31 #include "my_global.h"
32 #include "table_replication_applier_status.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include  "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h"    /*Multi source replication */
41 
42 THR_LOCK table_replication_applier_status::m_table_lock;
43 
44 /*
45   numbers in varchar count utf8 characters.
46 */
47 static const TABLE_FIELD_TYPE field_types[]=
48 {
49   {
50     {C_STRING_WITH_LEN("CHANNEL_NAME")},
51     {C_STRING_WITH_LEN("char(64)")},
52     {NULL, 0}
53   },
54   {
55     {C_STRING_WITH_LEN("SERVICE_STATE")},
56     {C_STRING_WITH_LEN("enum('ON','OFF')")},
57     {NULL, 0}
58   },
59   {
60     {C_STRING_WITH_LEN("REMAINING_DELAY")},
61     {C_STRING_WITH_LEN("int")},
62     {NULL, 0}
63   },
64   {
65     {C_STRING_WITH_LEN("COUNT_TRANSACTIONS_RETRIES")},
66     {C_STRING_WITH_LEN("bigint")},
67     {NULL, 0}
68   },
69 };
70 
71 TABLE_FIELD_DEF
72 table_replication_applier_status::m_field_def=
73 { 4, field_types };
74 
75 PFS_engine_table_share
76 table_replication_applier_status::m_share=
77 {
78   { C_STRING_WITH_LEN("replication_applier_status") },
79   &pfs_readonly_acl,
80   table_replication_applier_status::create,
81   NULL, /* write_row */
82   NULL, /* delete_all_rows */
83   table_replication_applier_status::get_row_count,    /* records */
84   sizeof(pos_t), /* ref length */
85   &m_table_lock,
86   &m_field_def,
87   false, /* checked */
88   false  /* perpetual */
89 };
90 
91 
create(void)92 PFS_engine_table* table_replication_applier_status::create(void)
93 {
94   return new table_replication_applier_status();
95 }
96 
table_replication_applier_status()97 table_replication_applier_status::table_replication_applier_status()
98   : PFS_engine_table(&m_share, &m_pos),
99     m_row_exists(false), m_pos(0), m_next_pos(0)
100 {}
101 
~table_replication_applier_status()102 table_replication_applier_status::~table_replication_applier_status()
103 {}
104 
reset_position(void)105 void table_replication_applier_status::reset_position(void)
106 {
107   m_pos.m_index= 0;
108   m_next_pos.m_index= 0;
109 }
110 
get_row_count()111 ha_rows table_replication_applier_status::get_row_count()
112 {
113  return channel_map.get_max_channels();
114 }
115 
116 
rnd_next(void)117 int table_replication_applier_status::rnd_next(void)
118 {
119   Master_info *mi;
120   channel_map.rdlock();
121 
122   for(m_pos.set_at(&m_next_pos);
123       m_pos.m_index < channel_map.get_max_channels();
124       m_pos.next())
125   {
126     mi= channel_map.get_mi_at_pos(m_pos.m_index);
127 
128     if (mi && mi->host[0])
129     {
130       make_row(mi);
131       m_next_pos.set_after(&m_pos);
132       channel_map.unlock();
133       return 0;
134     }
135   }
136 
137   channel_map.unlock();
138   return HA_ERR_END_OF_FILE;
139 }
140 
141 
rnd_pos(const void * pos)142 int table_replication_applier_status::rnd_pos(const void *pos)
143 {
144   Master_info *mi=NULL;
145   int res= HA_ERR_RECORD_DELETED;
146 
147   set_position(pos);
148 
149   channel_map.rdlock();
150 
151   if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
152   {
153     make_row(mi);
154     res= 0;
155   }
156 
157   channel_map.unlock();
158   return res;
159 }
160 
make_row(Master_info * mi)161 void table_replication_applier_status::make_row(Master_info *mi)
162 {
163   char *slave_sql_running_state= NULL;
164 
165   m_row_exists= false;
166 
167   assert(mi != NULL);
168   assert(mi->rli != NULL);
169 
170   m_row.channel_name_length= mi->get_channel()? strlen(mi->get_channel()):0;
171   memcpy(m_row.channel_name, mi->get_channel(), m_row.channel_name_length);
172 
173   mysql_mutex_lock(&mi->rli->info_thd_lock);
174 
175   slave_sql_running_state= const_cast<char *>
176                            (mi->rli->info_thd ?
177                             mi->rli->info_thd->get_proc_info() : "");
178   mysql_mutex_unlock(&mi->rli->info_thd_lock);
179 
180 
181   mysql_mutex_lock(&mi->data_lock);
182   mysql_mutex_lock(&mi->rli->data_lock);
183 
184   if (mi->rli->slave_running)
185     m_row.service_state= PS_RPL_YES;
186   else
187     m_row.service_state= PS_RPL_NO;
188 
189   m_row.remaining_delay= 0;
190   if (slave_sql_running_state == stage_sql_thd_waiting_until_delay.m_name)
191   {
192     time_t t= my_time(0), sql_delay_end= mi->rli->get_sql_delay_end();
193     m_row.remaining_delay= (uint)(t < sql_delay_end ?
194                                       sql_delay_end - t : 0);
195     m_row.remaining_delay_is_set= true;
196   }
197   else
198     m_row.remaining_delay_is_set= false;
199 
200   m_row.count_transactions_retries= mi->rli->retried_trans;
201 
202   mysql_mutex_unlock(&mi->rli->data_lock);
203   mysql_mutex_unlock(&mi->data_lock);
204 
205   m_row_exists= true;
206 }
207 
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)208 int table_replication_applier_status::read_row_values(TABLE *table,
209                                        unsigned char *buf,
210                                        Field **fields,
211                                        bool read_all)
212 {
213   Field *f;
214 
215   if (unlikely(! m_row_exists))
216     return HA_ERR_RECORD_DELETED;
217 
218   assert(table->s->null_bytes == 1);
219   buf[0]= 0;
220 
221   for (; (f= *fields) ; fields++)
222   {
223     if (read_all || bitmap_is_set(table->read_set, f->field_index))
224     {
225       switch(f->field_index)
226       {
227       case 0: /**channel_name*/
228          set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
229          break;
230       case 1: /* service_state */
231         set_field_enum(f, m_row.service_state);
232         break;
233       case 2: /* remaining_delay */
234         if (m_row.remaining_delay_is_set)
235           set_field_ulong(f, m_row.remaining_delay);
236         else
237           f->set_null();
238         break;
239       case 3: /* total number of times transactions were retried */
240         set_field_ulonglong(f, m_row.count_transactions_retries);
241         break;
242       default:
243         assert(false);
244       }
245     }
246   }
247   return 0;
248 }
249