1 /* Copyright (c) 2015, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 /**
24 @file storage/perfschema/table_status_by_thread.cc
25 Table STATUS_BY_THREAD (implementation).
26 */
27
28 #include "my_global.h"
29 #include "table_status_by_thread.h"
30 #include "my_thread.h"
31 #include "pfs_instr_class.h"
32 #include "pfs_column_types.h"
33 #include "pfs_column_values.h"
34 #include "pfs_global.h"
35
36 THR_LOCK table_status_by_thread::m_table_lock;
37
38 static const TABLE_FIELD_TYPE field_types[]=
39 {
40 {
41 { C_STRING_WITH_LEN("THREAD_ID") },
42 { C_STRING_WITH_LEN("bigint(20)") },
43 { NULL, 0}
44 },
45 {
46 { C_STRING_WITH_LEN("VARIABLE_NAME") },
47 { C_STRING_WITH_LEN("varchar(64)") },
48 { NULL, 0}
49 },
50 {
51 { C_STRING_WITH_LEN("VARIABLE_VALUE") },
52 { C_STRING_WITH_LEN("varchar(1024)") },
53 { NULL, 0}
54 }
55 };
56
57 TABLE_FIELD_DEF
58 table_status_by_thread::m_field_def=
59 { 3, field_types };
60
61 PFS_engine_table_share
62 table_status_by_thread::m_share=
63 {
64 { C_STRING_WITH_LEN("status_by_thread") },
65 &pfs_truncatable_acl,
66 table_status_by_thread::create,
67 NULL, /* write_row */
68 table_status_by_thread::delete_all_rows,
69 table_status_by_thread::get_row_count,
70 sizeof(pos_t),
71 &m_table_lock,
72 &m_field_def,
73 false, /* checked */
74 false /* perpetual */
75 };
76
77 PFS_engine_table*
create(void)78 table_status_by_thread::create(void)
79 {
80 return new table_status_by_thread();
81 }
82
delete_all_rows(void)83 int table_status_by_thread::delete_all_rows(void)
84 {
85 /* Lock required to aggregate to global_status_vars. */
86 mysql_mutex_lock(&LOCK_status);
87
88 reset_status_by_thread();
89
90 mysql_mutex_unlock(&LOCK_status);
91 return 0;
92 }
93
get_row_count(void)94 ha_rows table_status_by_thread::get_row_count(void)
95 {
96 mysql_mutex_lock(&LOCK_status);
97 size_t status_var_count= all_status_vars.size();
98 mysql_mutex_unlock(&LOCK_status);
99 return (global_thread_container.get_row_count() * status_var_count);
100 }
101
table_status_by_thread()102 table_status_by_thread::table_status_by_thread()
103 : PFS_engine_table(&m_share, &m_pos),
104 m_status_cache(true), m_row_exists(false), m_pos(), m_next_pos()
105 {}
106
reset_position(void)107 void table_status_by_thread::reset_position(void)
108 {
109 m_pos.reset();
110 m_next_pos.reset();
111 }
112
rnd_init(bool scan)113 int table_status_by_thread::rnd_init(bool scan)
114 {
115 if (show_compatibility_56)
116 return 0;
117
118 /*
119 Build array of SHOW_VARs from the global status array prior to materializing
120 threads in rnd_next() or rnd_pos().
121 */
122 m_status_cache.initialize_session();
123
124 /* Record the current number of status variables to detect subsequent changes. */
125 ulonglong status_version= m_status_cache.get_status_array_version();
126
127 /*
128 The table context holds the current version of the global status array
129 and a record of which threads were materialized. If scan == true, then
130 allocate a new context from mem_root and store in TLS. If scan == false,
131 then restore from TLS.
132 */
133 m_context= (table_status_by_thread_context *)current_thd->alloc(sizeof(table_status_by_thread_context));
134 new(m_context) table_status_by_thread_context(status_version, !scan);
135 return 0;
136 }
137
rnd_next(void)138 int table_status_by_thread::rnd_next(void)
139 {
140 if (show_compatibility_56)
141 return HA_ERR_END_OF_FILE;
142
143 /* If global status array changes, exit with warning. */ // TODO: Issue warning
144 if (!m_context->versions_match())
145 return HA_ERR_END_OF_FILE;
146
147 bool has_more_thread= true;
148
149 for (m_pos.set_at(&m_next_pos);
150 has_more_thread;
151 m_pos.next_thread())
152 {
153 PFS_thread *pfs_thread= global_thread_container.get(m_pos.m_index_1, &has_more_thread);
154 if (m_status_cache.materialize_session(pfs_thread) == 0)
155 {
156 /* Mark this thread as materialized. */
157 m_context->set_item(m_pos.m_index_1);
158 const Status_variable *stat_var= m_status_cache.get(m_pos.m_index_2);
159 if (stat_var != NULL)
160 {
161 make_row(pfs_thread, stat_var);
162 m_next_pos.set_after(&m_pos);
163 return 0;
164 }
165 }
166 }
167 return HA_ERR_END_OF_FILE;
168 }
169
170 int
rnd_pos(const void * pos)171 table_status_by_thread::rnd_pos(const void *pos)
172 {
173 if (show_compatibility_56)
174 return HA_ERR_RECORD_DELETED;
175
176 /* If global status array has changed, do nothing. */
177 if (!m_context->versions_match())
178 return HA_ERR_RECORD_DELETED;
179
180 set_position(pos);
181 assert(m_pos.m_index_1 < global_thread_container.get_row_count());
182
183 PFS_thread *pfs_thread= global_thread_container.get(m_pos.m_index_1);
184 /*
185 Only materialize threads that were previously materialized by rnd_next().
186 If a thread cannot be rematerialized, then do nothing.
187 */
188 if (m_context->is_item_set(m_pos.m_index_1) &&
189 m_status_cache.materialize_session(pfs_thread) == 0)
190 {
191 const Status_variable *stat_var= m_status_cache.get(m_pos.m_index_2);
192 if (stat_var != NULL)
193 {
194 make_row(pfs_thread, stat_var);
195 return 0;
196 }
197 }
198 return HA_ERR_RECORD_DELETED;
199 }
200
201 void table_status_by_thread
make_row(PFS_thread * thread,const Status_variable * status_var)202 ::make_row(PFS_thread *thread, const Status_variable *status_var)
203 {
204 pfs_optimistic_state lock;
205 m_row_exists= false;
206 if (status_var->is_null())
207 return;
208
209 /* Protect this reader against a thread termination */
210 thread->m_lock.begin_optimistic_lock(&lock);
211
212 m_row.m_thread_internal_id= thread->m_thread_internal_id;
213 m_row.m_variable_name.make_row(status_var->m_name, status_var->m_name_length);
214 m_row.m_variable_value.make_row(status_var);
215
216 if (!thread->m_lock.end_optimistic_lock(&lock))
217 return;
218
219 m_row_exists= true;
220 }
221
222 int table_status_by_thread
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)223 ::read_row_values(TABLE *table,
224 unsigned char *buf,
225 Field **fields,
226 bool read_all)
227 {
228 Field *f;
229
230 if (unlikely(! m_row_exists))
231 return HA_ERR_RECORD_DELETED;
232
233 /* Set the null bits */
234 assert(table->s->null_bytes == 1);
235 buf[0]= 0;
236
237 for (; (f= *fields) ; fields++)
238 {
239 if (read_all || bitmap_is_set(table->read_set, f->field_index))
240 {
241 switch(f->field_index)
242 {
243 case 0: /* THREAD_ID */
244 set_field_ulonglong(f, m_row.m_thread_internal_id);
245 break;
246 case 1: /* VARIABLE_NAME */
247 set_field_varchar_utf8(f, m_row.m_variable_name.m_str, m_row.m_variable_name.m_length);
248 break;
249 case 2: /* VARIABLE_VALUE */
250 m_row.m_variable_value.set_field(f);
251 break;
252 default:
253 assert(false);
254 }
255 }
256 }
257
258 return 0;
259 }
260
261