1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License, version 2.0,
5   as published by the Free Software Foundation.
6 
7   This program is also distributed with certain software (including
8   but not limited to OpenSSL) that is licensed under separate terms,
9   as designated in a particular file or component or in included license
10   documentation.  The authors of MySQL hereby grant you an additional
11   permission to link the program and your derivative works with the
12   separately licensed software that they have included with MySQL.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License, version 2.0, for more details.
18 
19   You should have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software Foundation,
21   51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 /**
24   @file storage/perfschema/pfs_digest.h
25   Statement Digest data structures (implementation).
26 */
27 
28 /*
29   This code needs extra visibility in the lexer structures
30 */
31 
32 #include "my_global.h"
33 #include "my_sys.h"
34 #include "pfs_instr.h"
35 #include "pfs_digest.h"
36 #include "pfs_global.h"
37 #include "pfs_builtin_memory.h"
38 #include "table_helper.h"
39 #include "sql_lex.h"
40 #include "sql_signal.h"
41 #include "sql_get_diagnostics.h"
42 #include "sql_string.h"
43 #include <string.h>
44 
45 size_t digest_max= 0;
46 ulong digest_lost= 0;
47 
48 /** EVENTS_STATEMENTS_HISTORY_LONG circular buffer. */
49 PFS_statements_digest_stat *statements_digest_stat_array= NULL;
50 static unsigned char *statements_digest_token_array= NULL;
51 /** Consumer flag for table EVENTS_STATEMENTS_SUMMARY_BY_DIGEST. */
52 bool flag_statements_digest= true;
53 /**
54   Current index in Stat array where new record is to be inserted.
55   index 0 is reserved for "all else" case when entire array is full.
56 */
57 static PFS_ALIGNED PFS_cacheline_uint32 digest_monotonic_index;
58 bool digest_full= false;
59 
60 LF_HASH digest_hash;
61 static bool digest_hash_inited= false;
62 
63 /**
64   Initialize table EVENTS_STATEMENTS_SUMMARY_BY_DIGEST.
65   @param param performance schema sizing
66 */
init_digest(const PFS_global_param * param)67 int init_digest(const PFS_global_param *param)
68 {
69   /*
70     Allocate memory for statements_digest_stat_array based on
71     performance_schema_digests_size values
72   */
73   digest_max= param->m_digest_sizing;
74   digest_lost= 0;
75   PFS_atomic::store_u32(& digest_monotonic_index.m_u32, 1);
76   digest_full= false;
77 
78   if (digest_max == 0)
79     return 0;
80 
81   statements_digest_stat_array=
82     PFS_MALLOC_ARRAY(& builtin_memory_digest,
83                      digest_max,
84                      sizeof(PFS_statements_digest_stat), PFS_statements_digest_stat,
85                      MYF(MY_ZEROFILL));
86 
87   if (unlikely(statements_digest_stat_array == NULL))
88   {
89     cleanup_digest();
90     return 1;
91   }
92 
93   if (pfs_max_digest_length > 0)
94   {
95     /* Size of each digest array. */
96     size_t digest_memory_size= pfs_max_digest_length * sizeof(unsigned char);
97 
98     statements_digest_token_array=
99       PFS_MALLOC_ARRAY(& builtin_memory_digest_tokens,
100                        digest_max,
101                        digest_memory_size,
102                        unsigned char,
103                        MYF(MY_ZEROFILL));
104 
105     if (unlikely(statements_digest_token_array == NULL))
106     {
107       cleanup_digest();
108       return 1;
109     }
110   }
111 
112   for (size_t index= 0; index < digest_max; index++)
113   {
114     statements_digest_stat_array[index].reset_data(statements_digest_token_array
115                                                    + index * pfs_max_digest_length, pfs_max_digest_length);
116   }
117 
118   /* Set record[0] as allocated. */
119   statements_digest_stat_array[0].m_lock.set_allocated();
120 
121   return 0;
122 }
123 
124 /** Cleanup table EVENTS_STATEMENTS_SUMMARY_BY_DIGEST. */
cleanup_digest(void)125 void cleanup_digest(void)
126 {
127   PFS_FREE_ARRAY(& builtin_memory_digest,
128                  digest_max,
129                  sizeof(PFS_statements_digest_stat),
130                  statements_digest_stat_array);
131 
132   PFS_FREE_ARRAY(& builtin_memory_digest_tokens,
133                  digest_max,
134                  (pfs_max_digest_length * sizeof(unsigned char)),
135                  statements_digest_token_array);
136 
137   statements_digest_stat_array= NULL;
138   statements_digest_token_array= NULL;
139 }
140 
141 C_MODE_START
digest_hash_get_key(const uchar * entry,size_t * length,my_bool)142 static uchar *digest_hash_get_key(const uchar *entry, size_t *length,
143                                   my_bool)
144 {
145   const PFS_statements_digest_stat * const *typed_entry;
146   const PFS_statements_digest_stat *digest;
147   const void *result;
148   typed_entry= reinterpret_cast<const PFS_statements_digest_stat*const*>(entry);
149   assert(typed_entry != NULL);
150   digest= *typed_entry;
151   assert(digest != NULL);
152   *length= sizeof (PFS_digest_key);
153   result= & digest->m_digest_key;
154   return const_cast<uchar*> (reinterpret_cast<const uchar*> (result));
155 }
156 C_MODE_END
157 
158 
159 /**
160   Initialize the digest hash.
161   @return 0 on success
162 */
init_digest_hash(const PFS_global_param * param)163 int init_digest_hash(const PFS_global_param *param)
164 {
165   if ((! digest_hash_inited) && (param->m_digest_sizing != 0))
166   {
167     lf_hash_init(&digest_hash, sizeof(PFS_statements_digest_stat*),
168                  LF_HASH_UNIQUE, 0, 0, digest_hash_get_key,
169                  &my_charset_bin);
170     digest_hash_inited= true;
171   }
172   return 0;
173 }
174 
cleanup_digest_hash(void)175 void cleanup_digest_hash(void)
176 {
177   if (digest_hash_inited)
178   {
179     lf_hash_destroy(&digest_hash);
180     digest_hash_inited= false;
181   }
182 }
183 
get_digest_hash_pins(PFS_thread * thread)184 static LF_PINS* get_digest_hash_pins(PFS_thread *thread)
185 {
186   if (unlikely(thread->m_digest_hash_pins == NULL))
187   {
188     if (!digest_hash_inited)
189       return NULL;
190     thread->m_digest_hash_pins= lf_hash_get_pins(&digest_hash);
191   }
192   return thread->m_digest_hash_pins;
193 }
194 
195 PFS_statement_stat*
find_or_create_digest(PFS_thread * thread,const sql_digest_storage * digest_storage,const char * schema_name,uint schema_name_length)196 find_or_create_digest(PFS_thread *thread,
197                       const sql_digest_storage *digest_storage,
198                       const char *schema_name,
199                       uint schema_name_length)
200 {
201   assert(digest_storage != NULL);
202 
203   if (statements_digest_stat_array == NULL)
204     return NULL;
205 
206   if (digest_storage->m_byte_count <= 0)
207     return NULL;
208 
209   LF_PINS *pins= get_digest_hash_pins(thread);
210   if (unlikely(pins == NULL))
211     return NULL;
212 
213   /*
214     Note: the LF_HASH key is a block of memory,
215     make sure to clean unused bytes,
216     so that memcmp() can compare keys.
217   */
218   PFS_digest_key hash_key;
219   memset(& hash_key, 0, sizeof(hash_key));
220   /* Compute MD5 Hash of the tokens received. */
221   compute_digest_md5(digest_storage, hash_key.m_md5);
222   memcpy((void*)& digest_storage->m_md5, &hash_key.m_md5, MD5_HASH_SIZE);
223   /* Add the current schema to the key */
224   hash_key.m_schema_name_length= schema_name_length;
225   if (schema_name_length > 0)
226     memcpy(hash_key.m_schema_name, schema_name, schema_name_length);
227 
228   int res;
229   uint retry_count= 0;
230   const uint retry_max= 3;
231   size_t safe_index;
232   size_t attempts= 0;
233   PFS_statements_digest_stat **entry;
234   PFS_statements_digest_stat *pfs= NULL;
235   pfs_dirty_state dirty_state;
236 
237   ulonglong now= my_micro_time();
238 
239 search:
240 
241   /* Lookup LF_HASH using this new key. */
242   entry= reinterpret_cast<PFS_statements_digest_stat**>
243     (lf_hash_search(&digest_hash, pins,
244                     &hash_key, sizeof(PFS_digest_key)));
245 
246   if (entry && (entry != MY_ERRPTR))
247   {
248     /* If digest already exists, update stats and return. */
249     pfs= *entry;
250     pfs->m_last_seen= now;
251     lf_hash_search_unpin(pins);
252     return & pfs->m_stat;
253   }
254 
255   lf_hash_search_unpin(pins);
256 
257   if (digest_full)
258   {
259     /* digest_stat array is full. Add stat at index 0 and return. */
260     pfs= &statements_digest_stat_array[0];
261     digest_lost++;
262 
263     if (pfs->m_first_seen == 0)
264       pfs->m_first_seen= now;
265     pfs->m_last_seen= now;
266     return & pfs->m_stat;
267   }
268 
269   while (++attempts <= digest_max)
270   {
271     safe_index= PFS_atomic::add_u32(& digest_monotonic_index.m_u32, 1) % digest_max;
272     if (safe_index == 0)
273     {
274       /* Record [0] is reserved. */
275       continue;
276     }
277 
278     /* Add a new record in digest stat array. */
279     assert(safe_index < digest_max);
280     pfs= &statements_digest_stat_array[safe_index];
281 
282     if (pfs->m_lock.is_free())
283     {
284       if (pfs->m_lock.free_to_dirty(& dirty_state))
285       {
286         /* Copy digest hash/LF Hash search key. */
287         memcpy(& pfs->m_digest_key, &hash_key, sizeof(PFS_digest_key));
288 
289         /*
290           Copy digest storage to statement_digest_stat_array so that it could be
291           used later to generate digest text.
292         */
293         pfs->m_digest_storage.copy(digest_storage);
294 
295         pfs->m_first_seen= now;
296         pfs->m_last_seen= now;
297 
298         res= lf_hash_insert(&digest_hash, pins, &pfs);
299         if (likely(res == 0))
300         {
301           pfs->m_lock.dirty_to_allocated(& dirty_state);
302           return & pfs->m_stat;
303         }
304 
305         pfs->m_lock.dirty_to_free(& dirty_state);
306 
307         if (res > 0)
308         {
309           /* Duplicate insert by another thread */
310           if (++retry_count > retry_max)
311           {
312             /* Avoid infinite loops */
313             digest_lost++;
314             return NULL;
315           }
316           goto search;
317         }
318 
319         /* OOM in lf_hash_insert */
320         digest_lost++;
321         return NULL;
322       }
323     }
324   }
325 
326   /* The digest array is now full. */
327   digest_full= true;
328   pfs= &statements_digest_stat_array[0];
329 
330   if (pfs->m_first_seen == 0)
331     pfs->m_first_seen= now;
332   pfs->m_last_seen= now;
333   return & pfs->m_stat;
334 }
335 
purge_digest(PFS_thread * thread,PFS_digest_key * hash_key)336 void purge_digest(PFS_thread* thread, PFS_digest_key *hash_key)
337 {
338   LF_PINS *pins= get_digest_hash_pins(thread);
339   if (unlikely(pins == NULL))
340     return;
341 
342   PFS_statements_digest_stat **entry;
343 
344   /* Lookup LF_HASH using this new key. */
345   entry= reinterpret_cast<PFS_statements_digest_stat**>
346     (lf_hash_search(&digest_hash, pins,
347                     hash_key, sizeof(PFS_digest_key)));
348 
349   if (entry && (entry != MY_ERRPTR))
350   {
351     lf_hash_delete(&digest_hash, pins,
352                    hash_key, sizeof(PFS_digest_key));
353   }
354   lf_hash_search_unpin(pins);
355   return;
356 }
357 
reset_data(unsigned char * token_array,size_t length)358 void PFS_statements_digest_stat::reset_data(unsigned char *token_array, size_t length)
359 {
360   pfs_dirty_state dirty_state;
361   m_lock.set_dirty(& dirty_state);
362   m_digest_storage.reset(token_array, length);
363   m_stat.reset();
364   m_first_seen= 0;
365   m_last_seen= 0;
366   m_lock.dirty_to_free(& dirty_state);
367 }
368 
reset_index(PFS_thread * thread)369 void PFS_statements_digest_stat::reset_index(PFS_thread *thread)
370 {
371   /* Only remove entries that exists in the HASH index. */
372   if (m_digest_storage.m_byte_count > 0)
373   {
374     purge_digest(thread, & m_digest_key);
375   }
376 }
377 
reset_esms_by_digest()378 void reset_esms_by_digest()
379 {
380   uint index;
381 
382   if (statements_digest_stat_array == NULL)
383     return;
384 
385   PFS_thread *thread= PFS_thread::get_current_thread();
386   if (unlikely(thread == NULL))
387     return;
388 
389   /* Reset statements_digest_stat_array. */
390   for (index= 0; index < digest_max; index++)
391   {
392     statements_digest_stat_array[index].reset_index(thread);
393     statements_digest_stat_array[index].reset_data(statements_digest_token_array + index * pfs_max_digest_length, pfs_max_digest_length);
394   }
395 
396   /* Mark record[0] as allocated again. */
397   statements_digest_stat_array[0].m_lock.set_allocated();
398 
399   /*
400     Reset index which indicates where the next calculated digest information
401     to be inserted in statements_digest_stat_array.
402   */
403   PFS_atomic::store_u32(& digest_monotonic_index.m_u32, 1);
404   digest_full= false;
405 }
406 
407