1 /* Copyright (c) 2008, 2021, Oracle and/or its affiliates.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License, version 2.0,
5   as published by the Free Software Foundation.
6 
7   This program is also distributed with certain software (including
8   but not limited to OpenSSL) that is licensed under separate terms,
9   as designated in a particular file or component or in included license
10   documentation.  The authors of MySQL hereby grant you an additional
11   permission to link the program and your derivative works with the
12   separately licensed software that they have included with MySQL.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License, version 2.0, for more details.
18 
19   You should have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software Foundation,
21   51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
22 
23 /**
24   @file storage/perfschema/pfs_digest.h
25   Statement Digest data structures (implementation).
26 */
27 
28 /*
29   This code needs extra visibility in the lexer structures
30 */
31 
32 #define MYSQL_LEX 1
33 
34 #include "my_global.h"
35 #include "my_sys.h"
36 #include "pfs_instr.h"
37 #include "pfs_digest.h"
38 #include "pfs_global.h"
39 #include "pfs_builtin_memory.h"
40 #include "table_helper.h"
41 #include "sql_lex.h"
42 #include "sql_signal.h"
43 #include "sql_get_diagnostics.h"
44 #include "sql_string.h"
45 #include <string.h>
46 
47 size_t digest_max= 0;
48 ulong digest_lost= 0;
49 
50 /** EVENTS_STATEMENTS_HISTORY_LONG circular buffer. */
51 PFS_statements_digest_stat *statements_digest_stat_array= NULL;
52 static unsigned char *statements_digest_token_array= NULL;
53 /** Consumer flag for table EVENTS_STATEMENTS_SUMMARY_BY_DIGEST. */
54 bool flag_statements_digest= true;
55 /**
56   Current index in Stat array where new record is to be inserted.
57   index 0 is reserved for "all else" case when entire array is full.
58 */
59 static PFS_ALIGNED PFS_cacheline_uint32 digest_monotonic_index;
60 
61 bool digest_full= false;
62 
63 LF_HASH digest_hash;
64 static bool digest_hash_inited= false;
65 
66 /**
67   Initialize table EVENTS_STATEMENTS_SUMMARY_BY_DIGEST.
68   @param param performance schema sizing
69 */
init_digest(const PFS_global_param * param)70 int init_digest(const PFS_global_param *param)
71 {
72   /*
73     Allocate memory for statements_digest_stat_array based on
74     performance_schema_digests_size values
75   */
76   digest_max= param->m_digest_sizing;
77   digest_lost= 0;
78   PFS_atomic::store_u32(& digest_monotonic_index.m_u32, 1);
79   digest_full= false;
80 
81   if (digest_max == 0)
82     return 0;
83 
84   statements_digest_stat_array=
85     PFS_MALLOC_ARRAY(& builtin_memory_digest,
86                      digest_max,
87                      sizeof(PFS_statements_digest_stat), PFS_statements_digest_stat,
88                      MYF(MY_ZEROFILL));
89 
90   if (unlikely(statements_digest_stat_array == NULL))
91   {
92     cleanup_digest();
93     return 1;
94   }
95 
96   if (pfs_max_digest_length > 0)
97   {
98     /* Size of each digest array. */
99     size_t digest_memory_size= pfs_max_digest_length * sizeof(unsigned char);
100 
101     statements_digest_token_array=
102       PFS_MALLOC_ARRAY(& builtin_memory_digest_tokens,
103                        digest_max,
104                        digest_memory_size,
105                        unsigned char,
106                        MYF(MY_ZEROFILL));
107 
108     if (unlikely(statements_digest_token_array == NULL))
109     {
110       cleanup_digest();
111       return 1;
112     }
113   }
114 
115   for (size_t index= 0; index < digest_max; index++)
116   {
117     statements_digest_stat_array[index].reset_data(statements_digest_token_array
118                                                    + index * pfs_max_digest_length, pfs_max_digest_length);
119   }
120 
121   /* Set record[0] as allocated. */
122   statements_digest_stat_array[0].m_lock.set_allocated();
123 
124   /* Set record[0] as allocated. */
125   statements_digest_stat_array[0].m_lock.set_allocated();
126 
127   return 0;
128 }
129 
130 /** Cleanup table EVENTS_STATEMENTS_SUMMARY_BY_DIGEST. */
cleanup_digest(void)131 void cleanup_digest(void)
132 {
133   PFS_FREE_ARRAY(& builtin_memory_digest,
134                  digest_max,
135                  sizeof(PFS_statements_digest_stat),
136                  statements_digest_stat_array);
137 
138   PFS_FREE_ARRAY(& builtin_memory_digest_tokens,
139                  digest_max,
140                  (pfs_max_digest_length * sizeof(unsigned char)),
141                  statements_digest_token_array);
142 
143   statements_digest_stat_array= NULL;
144   statements_digest_token_array= NULL;
145 }
146 
147 C_MODE_START
digest_hash_get_key(const uchar * entry,size_t * length,my_bool)148 static uchar *digest_hash_get_key(const uchar *entry, size_t *length,
149                                   my_bool)
150 {
151   const PFS_statements_digest_stat * const *typed_entry;
152   const PFS_statements_digest_stat *digest;
153   const void *result;
154   typed_entry= reinterpret_cast<const PFS_statements_digest_stat*const*>(entry);
155   assert(typed_entry != NULL);
156   digest= *typed_entry;
157   assert(digest != NULL);
158   *length= sizeof (PFS_digest_key);
159   result= & digest->m_digest_key;
160   return const_cast<uchar*> (reinterpret_cast<const uchar*> (result));
161 }
162 C_MODE_END
163 
164 
165 /**
166   Initialize the digest hash.
167   @return 0 on success
168 */
init_digest_hash(const PFS_global_param * param)169 int init_digest_hash(const PFS_global_param *param)
170 {
171   if ((! digest_hash_inited) && (param->m_digest_sizing != 0))
172   {
173     lf_hash_init(&digest_hash, sizeof(PFS_statements_digest_stat*),
174                  LF_HASH_UNIQUE, 0, 0, digest_hash_get_key,
175                  &my_charset_bin);
176     digest_hash_inited= true;
177   }
178   return 0;
179 }
180 
cleanup_digest_hash(void)181 void cleanup_digest_hash(void)
182 {
183   if (digest_hash_inited)
184   {
185     lf_hash_destroy(&digest_hash);
186     digest_hash_inited= false;
187   }
188 }
189 
get_digest_hash_pins(PFS_thread * thread)190 static LF_PINS* get_digest_hash_pins(PFS_thread *thread)
191 {
192   if (unlikely(thread->m_digest_hash_pins == NULL))
193   {
194     if (!digest_hash_inited)
195       return NULL;
196     thread->m_digest_hash_pins= lf_hash_get_pins(&digest_hash);
197   }
198   return thread->m_digest_hash_pins;
199 }
200 
201 PFS_statement_stat*
find_or_create_digest(PFS_thread * thread,const sql_digest_storage * digest_storage,const char * schema_name,uint schema_name_length)202 find_or_create_digest(PFS_thread *thread,
203                       const sql_digest_storage *digest_storage,
204                       const char *schema_name,
205                       uint schema_name_length)
206 {
207   assert(digest_storage != NULL);
208 
209   if (statements_digest_stat_array == NULL)
210     return NULL;
211 
212   if (digest_storage->m_byte_count <= 0)
213     return NULL;
214 
215   LF_PINS *pins= get_digest_hash_pins(thread);
216   if (unlikely(pins == NULL))
217     return NULL;
218 
219   /*
220     Note: the LF_HASH key is a block of memory,
221     make sure to clean unused bytes,
222     so that memcmp() can compare keys.
223   */
224   PFS_digest_key hash_key;
225   memset(& hash_key, 0, sizeof(hash_key));
226   /* Compute MD5 Hash of the tokens received. */
227   compute_digest_md5(digest_storage, hash_key.m_md5);
228   memcpy((void*)& digest_storage->m_md5, &hash_key.m_md5, MD5_HASH_SIZE);
229   /* Add the current schema to the key */
230   hash_key.m_schema_name_length= schema_name_length;
231   if (schema_name_length > 0)
232     memcpy(hash_key.m_schema_name, schema_name, schema_name_length);
233 
234   int res;
235   uint retry_count= 0;
236   const uint retry_max= 3;
237   size_t safe_index;
238   size_t attempts= 0;
239   PFS_statements_digest_stat **entry;
240   PFS_statements_digest_stat *pfs= NULL;
241   pfs_dirty_state dirty_state;
242 
243   ulonglong now= my_hrtime().val;
244 
245 search:
246 
247   /* Lookup LF_HASH using this new key. */
248   entry= reinterpret_cast<PFS_statements_digest_stat**>
249     (lf_hash_search(&digest_hash, pins,
250                     &hash_key, sizeof(PFS_digest_key)));
251 
252   if (entry && (entry != MY_ERRPTR))
253   {
254     /* If digest already exists, update stats and return. */
255     pfs= *entry;
256     pfs->m_last_seen= now;
257     lf_hash_search_unpin(pins);
258     return & pfs->m_stat;
259   }
260 
261   lf_hash_search_unpin(pins);
262 
263   if (digest_full)
264   {
265     /* digest_stat array is full. Add stat at index 0 and return. */
266     pfs= &statements_digest_stat_array[0];
267     digest_lost++;
268 
269     if (pfs->m_first_seen == 0)
270       pfs->m_first_seen= now;
271     pfs->m_last_seen= now;
272     return & pfs->m_stat;
273   }
274 
275   while (++attempts <= digest_max)
276   {
277     safe_index= PFS_atomic::add_u32(& digest_monotonic_index.m_u32, 1) % digest_max;
278     if (safe_index == 0)
279     {
280       /* Record [0] is reserved. */
281       continue;
282     }
283 
284     /* Add a new record in digest stat array. */
285     assert(safe_index < digest_max);
286     pfs= &statements_digest_stat_array[safe_index];
287 
288     if (pfs->m_lock.is_free())
289     {
290       if (pfs->m_lock.free_to_dirty(& dirty_state))
291       {
292         /* Copy digest hash/LF Hash search key. */
293         memcpy(& pfs->m_digest_key, &hash_key, sizeof(PFS_digest_key));
294 
295         /*
296           Copy digest storage to statement_digest_stat_array so that it could be
297           used later to generate digest text.
298         */
299         pfs->m_digest_storage.copy(digest_storage);
300 
301         pfs->m_first_seen= now;
302         pfs->m_last_seen= now;
303 
304         res= lf_hash_insert(&digest_hash, pins, &pfs);
305         if (likely(res == 0))
306         {
307           pfs->m_lock.dirty_to_allocated(& dirty_state);
308           return & pfs->m_stat;
309         }
310 
311         pfs->m_lock.dirty_to_free(& dirty_state);
312 
313         if (res > 0)
314         {
315           /* Duplicate insert by another thread */
316           if (++retry_count > retry_max)
317           {
318             /* Avoid infinite loops */
319             digest_lost++;
320             return NULL;
321           }
322           goto search;
323         }
324 
325         /* OOM in lf_hash_insert */
326         digest_lost++;
327         return NULL;
328       }
329     }
330   }
331 
332   /* The digest array is now full. */
333   digest_full= true;
334   pfs= &statements_digest_stat_array[0];
335 
336   if (pfs->m_first_seen == 0)
337     pfs->m_first_seen= now;
338   pfs->m_last_seen= now;
339   return & pfs->m_stat;
340 }
341 
purge_digest(PFS_thread * thread,PFS_digest_key * hash_key)342 void purge_digest(PFS_thread* thread, PFS_digest_key *hash_key)
343 {
344   LF_PINS *pins= get_digest_hash_pins(thread);
345   if (unlikely(pins == NULL))
346     return;
347 
348   PFS_statements_digest_stat **entry;
349 
350   /* Lookup LF_HASH using this new key. */
351   entry= reinterpret_cast<PFS_statements_digest_stat**>
352     (lf_hash_search(&digest_hash, pins,
353                     hash_key, sizeof(PFS_digest_key)));
354 
355   if (entry && (entry != MY_ERRPTR))
356   {
357     lf_hash_delete(&digest_hash, pins,
358                    hash_key, sizeof(PFS_digest_key));
359   }
360   lf_hash_search_unpin(pins);
361   return;
362 }
363 
reset_data(unsigned char * token_array,size_t length)364 void PFS_statements_digest_stat::reset_data(unsigned char *token_array, size_t length)
365 {
366   pfs_dirty_state dirty_state;
367   m_lock.set_dirty(& dirty_state);
368   m_digest_storage.reset(token_array, length);
369   m_stat.reset();
370   m_first_seen= 0;
371   m_last_seen= 0;
372   m_lock.dirty_to_free(& dirty_state);
373 }
374 
reset_index(PFS_thread * thread)375 void PFS_statements_digest_stat::reset_index(PFS_thread *thread)
376 {
377   /* Only remove entries that exists in the HASH index. */
378   if (m_digest_storage.m_byte_count > 0)
379   {
380     purge_digest(thread, & m_digest_key);
381   }
382 }
383 
reset_esms_by_digest()384 void reset_esms_by_digest()
385 {
386   uint index;
387 
388   if (statements_digest_stat_array == NULL)
389     return;
390 
391   PFS_thread *thread= PFS_thread::get_current_thread();
392   if (unlikely(thread == NULL))
393     return;
394 
395   /* Reset statements_digest_stat_array. */
396   for (index= 0; index < digest_max; index++)
397   {
398     statements_digest_stat_array[index].reset_index(thread);
399     statements_digest_stat_array[index].reset_data(statements_digest_token_array + index * pfs_max_digest_length, pfs_max_digest_length);
400   }
401 
402   /* Mark record[0] as allocated again. */
403   statements_digest_stat_array[0].m_lock.set_allocated();
404 
405   /*
406     Reset index which indicates where the next calculated digest information
407     to be inserted in statements_digest_stat_array.
408   */
409   PFS_atomic::store_u32(& digest_monotonic_index.m_u32, 1);
410   digest_full= false;
411 }
412 
413