1 /* Copyright (c) 2008, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License, version 2.0,
5   as published by the Free Software Foundation.
6 
7   This program is also distributed with certain software (including
8   but not limited to OpenSSL) that is licensed under separate terms,
9   as designated in a particular file or component or in included license
10   documentation.  The authors of MySQL hereby grant you an additional
11   permission to link the program and your derivative works with the
12   separately licensed software that they have included with MySQL.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License, version 2.0, for more details.
18 
19   You should have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software
21   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /**
24   @file storage/perfschema/table_session_connect.cc
25   TABLE SESSION_CONNECT (abstract).
26 */
27 
28 #include "storage/perfschema/table_session_connect.h"
29 
30 #include "my_compiler.h"
31 #include "my_dbug.h"
32 #include "my_inttypes.h"
33 #include "sql/field.h"
34 #include "sql/table.h"
35 #include "storage/perfschema/pfs_buffer_container.h"
36 
match(PFS_thread * pfs)37 bool PFS_index_session_connect::match(PFS_thread *pfs) {
38   if (m_fields >= 1) {
39     if (!m_key_1.match(pfs)) {
40       return false;
41     }
42   }
43 
44   return true;
45 }
46 
match(row_session_connect_attrs * row)47 bool PFS_index_session_connect::match(row_session_connect_attrs *row) {
48   if (m_fields >= 2) {
49     if (!m_key_2.match(row->m_attr_name, row->m_attr_name_length)) {
50       return false;
51     }
52   }
53 
54   return true;
55 }
56 
table_session_connect(const PFS_engine_table_share * share)57 table_session_connect::table_session_connect(
58     const PFS_engine_table_share *share)
59     : cursor_by_thread_connect_attr(share) {
60   if (session_connect_attrs_size_per_thread > 0) {
61     m_copy_session_connect_attrs = (char *)my_malloc(
62         PSI_INSTRUMENT_ME, session_connect_attrs_size_per_thread, MYF(0));
63   } else {
64     m_copy_session_connect_attrs = nullptr;
65   }
66   m_copy_session_connect_attrs_length = 0;
67 }
68 
~table_session_connect()69 table_session_connect::~table_session_connect() {
70   my_free(m_copy_session_connect_attrs);
71 }
72 
index_init(uint idx MY_ATTRIBUTE ((unused)),bool)73 int table_session_connect::index_init(uint idx MY_ATTRIBUTE((unused)), bool) {
74   DBUG_ASSERT(idx == 0);
75   m_opened_index = PFS_NEW(PFS_index_session_connect);
76   m_index = m_opened_index;
77   return 0;
78 }
79 
index_next(void)80 int table_session_connect::index_next(void) {
81   PFS_thread *thread;
82   bool has_more_thread = true;
83   int rc = 0;
84 
85   for (m_pos.set_at(&m_next_pos); has_more_thread; m_pos.next_thread()) {
86     thread = global_thread_container.get(m_pos.m_index_1, &has_more_thread);
87     if (thread != nullptr) {
88       if (m_opened_index->match(thread)) {
89         do {
90           /*
91             Here we materialize the row first,
92             and then evaluate if it matches the index.
93             This is simpler, as parsing the session attributes encoded string
94             is done only once.
95           */
96           rc = make_row(thread, m_pos.m_index_2);
97 
98           if (rc == 0) {
99             if (m_opened_index->match(&m_row)) {
100               m_next_pos.set_after(&m_pos);
101               return 0;
102             }
103             m_pos.m_index_2++;
104           }
105         } while (rc == 0);
106       }
107     }
108   }
109 
110   return HA_ERR_END_OF_FILE;
111 }
112 
113 /**
114   Take a length encoded string
115 
116   @arg ptr  inout       the input string array
117   @arg dest             where to store the result
118   @arg dest_size        max size of @c dest
119   @arg copied_len       the actual length of the data copied
120   @arg start_ptr        pointer to the start of input
121   @arg input_length     the length of the incoming data
122   @arg from_cs          character set in which @c ptr is encoded
123   @arg nchars_max       maximum number of characters to read
124   @return status
125     @retval true    parsing failed
126     @retval false   parsing succeeded
127 */
parse_length_encoded_string(const char ** ptr,char * dest,uint dest_size,uint * copied_len,const char * start_ptr,uint input_length,const CHARSET_INFO * from_cs,uint nchars_max)128 static bool parse_length_encoded_string(const char **ptr, char *dest,
129                                         uint dest_size, uint *copied_len,
130                                         const char *start_ptr,
131                                         uint input_length,
132                                         const CHARSET_INFO *from_cs,
133                                         uint nchars_max) {
134   ulong copy_length, data_length;
135   const char *well_formed_error_pos = nullptr,
136              *cannot_convert_error_pos = nullptr, *from_end_pos = nullptr;
137 
138   data_length =
139       net_field_length(const_cast<uchar **>(pointer_cast<const uchar **>(ptr)));
140 
141   /* we don't tolerate NULL as a length */
142   if (data_length == NULL_LENGTH) {
143     return true;
144   }
145 
146   if (*ptr - start_ptr + data_length > input_length) {
147     return true;
148   }
149 
150   /*
151     TODO: Migrate the data itself to UTF8MB4,
152     this is still UTF8MB3 printed in a UTF8MB4 column.
153   */
154   copy_length = well_formed_copy_nchars(
155       &my_charset_utf8_bin, dest, dest_size, from_cs, *ptr, data_length,
156       nchars_max, &well_formed_error_pos, &cannot_convert_error_pos,
157       &from_end_pos);
158   *copied_len = copy_length;
159   (*ptr) += data_length;
160 
161   return false;
162 }
163 
164 /**
165   Take the nth attribute name/value pair
166 
167   Parse the attributes blob form the beginning, skipping the attributes
168   whose number is lower than the one we seek.
169   When we reach the attribute at an index we're looking for the values
170   are copied to the output parameters.
171   If parsing fails or no more attributes are found the function stops
172   and returns an error code.
173 
174   @param connect_attrs            pointer to the connect attributes blob
175   @param connect_attrs_length     length of @c connect_attrs
176   @param connect_attrs_cs         character set used to encode @c connect_attrs
177   @param ordinal                  index of the attribute we need
178   @param [out] attr_name          buffer to receive the attribute name
179   @param max_attr_name            max size of @c attr_name in bytes
180   @param [out] attr_name_length   number of bytes written in @c attr_name
181   @param [out] attr_value         buffer to receive the attribute name
182   @param max_attr_value           max size of @c attr_value in bytes
183   @param [out] attr_value_length  number of bytes written in @c attr_value
184   @return status
185     @retval true    requested attribute pair is found and copied
186     @retval false   error. Either because of parsing or too few attributes.
187 */
read_nth_attr(const char * connect_attrs,uint connect_attrs_length,const CHARSET_INFO * connect_attrs_cs,uint ordinal,char * attr_name,uint max_attr_name,uint * attr_name_length,char * attr_value,uint max_attr_value,uint * attr_value_length)188 bool read_nth_attr(const char *connect_attrs, uint connect_attrs_length,
189                    const CHARSET_INFO *connect_attrs_cs, uint ordinal,
190                    char *attr_name, uint max_attr_name, uint *attr_name_length,
191                    char *attr_value, uint max_attr_value,
192                    uint *attr_value_length) {
193   uint idx;
194   const char *ptr;
195 
196   for (ptr = connect_attrs, idx = 0;
197        (uint)(ptr - connect_attrs) < connect_attrs_length && idx <= ordinal;
198        idx++) {
199     uint copy_length;
200 
201     /* read the key */
202     if (parse_length_encoded_string(
203             &ptr, attr_name, max_attr_name, &copy_length, connect_attrs,
204             connect_attrs_length, connect_attrs_cs, 32) ||
205         !copy_length) {
206       return false;
207     }
208 
209     if (idx == ordinal) {
210       *attr_name_length = copy_length;
211     }
212 
213     /* read the value */
214     if (parse_length_encoded_string(
215             &ptr, attr_value, max_attr_value, &copy_length, connect_attrs,
216             connect_attrs_length, connect_attrs_cs, 1024)) {
217       return false;
218     }
219 
220     if (idx == ordinal) {
221       *attr_value_length = copy_length;
222     }
223 
224     if (idx == ordinal) {
225       return true;
226     }
227   }
228 
229   return false;
230 }
231 
make_row(PFS_thread * pfs,uint ordinal)232 int table_session_connect::make_row(PFS_thread *pfs, uint ordinal) {
233   pfs_optimistic_state lock;
234   pfs_optimistic_state session_lock;
235   PFS_thread_class *safe_class;
236   const CHARSET_INFO *cs;
237 
238   /* Protect this reader against thread termination */
239   pfs->m_lock.begin_optimistic_lock(&lock);
240   /* Protect this reader against writing on session attributes */
241   pfs->m_session_lock.begin_optimistic_lock(&session_lock);
242 
243   safe_class = sanitize_thread_class(pfs->m_class);
244   if (unlikely(safe_class == nullptr)) {
245     return HA_ERR_RECORD_DELETED;
246   }
247 
248   /* Filtering threads must be done under the protection of the optimistic lock.
249    */
250   if (!thread_fits(pfs)) {
251     return HA_ERR_RECORD_DELETED;
252   }
253 
254   /* Make a safe copy of the session attributes */
255 
256   if (m_copy_session_connect_attrs == nullptr) {
257     return HA_ERR_RECORD_DELETED;
258   }
259 
260   m_copy_session_connect_attrs_length = pfs->m_session_connect_attrs_length;
261 
262   if (m_copy_session_connect_attrs_length >
263       session_connect_attrs_size_per_thread) {
264     return HA_ERR_RECORD_DELETED;
265   }
266 
267   memcpy(m_copy_session_connect_attrs, pfs->m_session_connect_attrs,
268          m_copy_session_connect_attrs_length);
269 
270   cs = get_charset(pfs->m_session_connect_attrs_cs_number, MYF(0));
271   if (cs == nullptr) {
272     return HA_ERR_RECORD_DELETED;
273   }
274 
275   if (!pfs->m_session_lock.end_optimistic_lock(&session_lock)) {
276     return HA_ERR_RECORD_DELETED;
277   }
278 
279   if (!pfs->m_lock.end_optimistic_lock(&lock)) {
280     return HA_ERR_RECORD_DELETED;
281   }
282 
283   /*
284     Now we have a safe copy of the data,
285     that will not change while parsing it
286   */
287 
288   /* populate the row */
289   if (read_nth_attr(
290           m_copy_session_connect_attrs, m_copy_session_connect_attrs_length, cs,
291           ordinal, m_row.m_attr_name, (uint)sizeof(m_row.m_attr_name),
292           &m_row.m_attr_name_length, m_row.m_attr_value,
293           (uint)sizeof(m_row.m_attr_value), &m_row.m_attr_value_length)) {
294     /* we don't expect internal threads to have connection attributes */
295     if (pfs->m_processlist_id == 0) {
296       return HA_ERR_RECORD_DELETED;
297     }
298 
299     m_row.m_ordinal_position = ordinal;
300     m_row.m_process_id = pfs->m_processlist_id;
301 
302     return 0;
303   }
304 
305   return HA_ERR_RECORD_DELETED;
306 }
307 
read_row_values(TABLE * table,unsigned char * buf,Field ** fields,bool read_all)308 int table_session_connect::read_row_values(TABLE *table, unsigned char *buf,
309                                            Field **fields, bool read_all) {
310   Field *f;
311 
312   /* Set the null bits */
313   DBUG_ASSERT(table->s->null_bytes == 1);
314   buf[0] = 0;
315 
316   for (; (f = *fields); fields++) {
317     if (read_all || bitmap_is_set(table->read_set, f->field_index())) {
318       switch (f->field_index()) {
319         case FO_PROCESS_ID:
320           if (m_row.m_process_id != 0) {
321             set_field_ulonglong(f, m_row.m_process_id);
322           } else {
323             f->set_null();
324           }
325           break;
326         case FO_ATTR_NAME:
327           set_field_varchar_utf8(f, m_row.m_attr_name,
328                                  m_row.m_attr_name_length);
329           break;
330         case FO_ATTR_VALUE:
331           if (m_row.m_attr_value_length)
332             set_field_varchar_utf8(f, m_row.m_attr_value,
333                                    m_row.m_attr_value_length);
334           else {
335             f->set_null();
336           }
337           break;
338         case FO_ORDINAL_POSITION:
339           set_field_ulong(f, m_row.m_ordinal_position);
340           break;
341         default:
342           DBUG_ASSERT(false);
343       }
344     }
345   }
346   return 0;
347 }
348 
thread_fits(PFS_thread *)349 bool table_session_connect::thread_fits(PFS_thread *) { return true; }
350