1 /*
2 Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
23
24 /**
25 @file storage/perfschema/table_replication_applier_status.cc
26 Table replication_applier_status (implementation).
27 */
28
29 #define HAVE_REPLICATION
30
31 #include "my_global.h"
32 #include "table_replication_applier_status.h"
33 #include "pfs_instr_class.h"
34 #include "pfs_instr.h"
35 #include "rpl_slave.h"
36 #include "rpl_info.h"
37 #include "rpl_rli.h"
38 #include "rpl_mi.h"
39 #include "sql_parse.h"
40 #include "rpl_msr.h" /*Multi source replication */
41
42 THR_LOCK table_replication_applier_status::m_table_lock;
43
44 /*
45 numbers in varchar count utf8 characters.
46 */
47 static const TABLE_FIELD_TYPE field_types[]=
48 {
49 {
50 {C_STRING_WITH_LEN("CHANNEL_NAME")},
51 {C_STRING_WITH_LEN("char(64)")},
52 {NULL, 0}
53 },
54 {
55 {C_STRING_WITH_LEN("SERVICE_STATE")},
56 {C_STRING_WITH_LEN("enum('ON','OFF')")},
57 {NULL, 0}
58 },
59 {
60 {C_STRING_WITH_LEN("REMAINING_DELAY")},
61 {C_STRING_WITH_LEN("int")},
62 {NULL, 0}
63 },
64 {
65 {C_STRING_WITH_LEN("COUNT_TRANSACTIONS_RETRIES")},
66 {C_STRING_WITH_LEN("bigint")},
67 {NULL, 0}
68 },
69 };
70
71 TABLE_FIELD_DEF
72 table_replication_applier_status::m_field_def=
73 { 4, field_types };
74
75 PFS_engine_table_share
76 table_replication_applier_status::m_share=
77 {
78 { C_STRING_WITH_LEN("replication_applier_status") },
79 &pfs_readonly_acl,
80 table_replication_applier_status::create,
81 NULL, /* write_row */
82 NULL, /* delete_all_rows */
83 table_replication_applier_status::get_row_count, /* records */
84 sizeof(pos_t), /* ref length */
85 &m_table_lock,
86 &m_field_def,
87 false, /* checked */
88 false /* perpetual */
89 };
90
91
create(void)92 PFS_engine_table* table_replication_applier_status::create(void)
93 {
94 return new table_replication_applier_status();
95 }
96
table_replication_applier_status()97 table_replication_applier_status::table_replication_applier_status()
98 : PFS_engine_table(&m_share, &m_pos),
99 m_row_exists(false), m_pos(0), m_next_pos(0)
100 {}
101
~table_replication_applier_status()102 table_replication_applier_status::~table_replication_applier_status()
103 {}
104
reset_position(void)105 void table_replication_applier_status::reset_position(void)
106 {
107 m_pos.m_index= 0;
108 m_next_pos.m_index= 0;
109 }
110
get_row_count()111 ha_rows table_replication_applier_status::get_row_count()
112 {
113 return channel_map.get_max_channels();
114 }
115
116
rnd_next(void)117 int table_replication_applier_status::rnd_next(void)
118 {
119 Master_info *mi;
120 channel_map.rdlock();
121
122 for(m_pos.set_at(&m_next_pos);
123 m_pos.m_index < channel_map.get_max_channels();
124 m_pos.next())
125 {
126 mi= channel_map.get_mi_at_pos(m_pos.m_index);
127
128 if (mi && mi->host[0])
129 {
130 make_row(mi);
131 m_next_pos.set_after(&m_pos);
132 channel_map.unlock();
133 return 0;
134 }
135 }
136
137 channel_map.unlock();
138 return HA_ERR_END_OF_FILE;
139 }
140
141
rnd_pos(const void * pos)142 int table_replication_applier_status::rnd_pos(const void *pos)
143 {
144 Master_info *mi=NULL;
145 int res= HA_ERR_RECORD_DELETED;
146
147 set_position(pos);
148
149 channel_map.rdlock();
150
151 if ((mi= channel_map.get_mi_at_pos(m_pos.m_index)))
152 {
153 make_row(mi);
154 res= 0;
155 }
156
157 channel_map.unlock();
158 return res;
159 }
160
make_row(Master_info * mi)161 void table_replication_applier_status::make_row(Master_info *mi)
162 {
163 char *slave_sql_running_state= NULL;
164
165 m_row_exists= false;
166
167 assert(mi != NULL);
168 assert(mi->rli != NULL);
169
170 m_row.channel_name_length= mi->get_channel()? strlen(mi->get_channel()):0;
171 memcpy(m_row.channel_name, mi->get_channel(), m_row.channel_name_length);
172
173 mysql_mutex_lock(&mi->rli->info_thd_lock);
174
175 slave_sql_running_state= const_cast<char *>
176 (mi->rli->info_thd ?
177 mi->rli->info_thd->get_proc_info() : "");
178 mysql_mutex_unlock(&mi->rli->info_thd_lock);
179
180
181 mysql_mutex_lock(&mi->data_lock);
182 mysql_mutex_lock(&mi->rli->data_lock);
183
184 if (mi->rli->slave_running)
185 m_row.service_state= PS_RPL_YES;
186 else
187 m_row.service_state= PS_RPL_NO;
188
189 m_row.remaining_delay= 0;
190 if (slave_sql_running_state == stage_sql_thd_waiting_until_delay.m_name)
191 {
192 time_t t= my_time(0), sql_delay_end= mi->rli->get_sql_delay_end();
193 m_row.remaining_delay= (uint)(t < sql_delay_end ?
194 sql_delay_end - t : 0);
195 m_row.remaining_delay_is_set= true;
196 }
197 else
198 m_row.remaining_delay_is_set= false;
199
200 m_row.count_transactions_retries= mi->rli->retried_trans;
201
202 mysql_mutex_unlock(&mi->rli->data_lock);
203 mysql_mutex_unlock(&mi->data_lock);
204
205 m_row_exists= true;
206 }
207
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)208 int table_replication_applier_status::read_row_values(TABLE *table,
209 unsigned char *buf,
210 Field **fields,
211 bool read_all)
212 {
213 Field *f;
214
215 if (unlikely(! m_row_exists))
216 return HA_ERR_RECORD_DELETED;
217
218 assert(table->s->null_bytes == 1);
219 buf[0]= 0;
220
221 for (; (f= *fields) ; fields++)
222 {
223 if (read_all || bitmap_is_set(table->read_set, f->field_index))
224 {
225 switch(f->field_index)
226 {
227 case 0: /**channel_name*/
228 set_field_char_utf8(f, m_row.channel_name, m_row.channel_name_length);
229 break;
230 case 1: /* service_state */
231 set_field_enum(f, m_row.service_state);
232 break;
233 case 2: /* remaining_delay */
234 if (m_row.remaining_delay_is_set)
235 set_field_ulong(f, m_row.remaining_delay);
236 else
237 f->set_null();
238 break;
239 case 3: /* total number of times transactions were retried */
240 set_field_ulonglong(f, m_row.count_transactions_retries);
241 break;
242 default:
243 assert(false);
244 }
245 }
246 }
247 return 0;
248 }
249