1 /*
2    Copyright (c) 2007, 2013, Oracle and/or its affiliates.
3    Copyright (c) 2008, 2020, MariaDB
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA
17 */
18 
19 /*
20   Functions to autenticate and handle reqests for a connection
21 */
22 
23 #include "mariadb.h"
24 #include "mysqld.h"
25 #include "sql_priv.h"
26 #ifndef __WIN__
27 #include <netdb.h>        // getservbyname, servent
28 #endif
29 #include "sql_audit.h"
30 #include "sql_connect.h"
31 #include "thread_cache.h"
32 #include "probes_mysql.h"
33 #include "sql_parse.h"                          // sql_command_flags,
34                                                 // execute_init_command,
35                                                 // do_command
36 #include "sql_db.h"                             // mysql_change_db
37 #include "hostname.h" // inc_host_errors, ip_to_hostname,
38                       // reset_host_errors
39 #include "sql_callback.h"
40 
41 #ifdef WITH_WSREP
42 #include "wsrep_trans_observer.h" /* wsrep open/close */
43 #include "wsrep_mysqld.h"
44 #endif /* WITH_WSREP */
45 #include "proxy_protocol.h"
46 #include <ssl_compat.h>
47 
48 HASH global_user_stats, global_client_stats, global_table_stats;
49 HASH global_index_stats;
50 /* Protects the above global stats */
51 extern mysql_mutex_t LOCK_global_user_client_stats;
52 extern mysql_mutex_t LOCK_global_table_stats;
53 extern mysql_mutex_t LOCK_global_index_stats;
54 extern vio_keepalive_opts opt_vio_keepalive;
55 
56 /*
57   Get structure for logging connection data for the current user
58 */
59 
60 #ifndef NO_EMBEDDED_ACCESS_CHECKS
61 static HASH hash_user_connections;
62 
get_or_create_user_conn(THD * thd,const char * user,const char * host,const USER_RESOURCES * mqh)63 int get_or_create_user_conn(THD *thd, const char *user,
64                             const char *host,
65                             const USER_RESOURCES *mqh)
66 {
67   int return_val= 0;
68   size_t temp_len, user_len;
69   char temp_user[USER_HOST_BUFF_SIZE];
70   struct  user_conn *uc;
71 
72   DBUG_ASSERT(user != 0);
73   DBUG_ASSERT(host != 0);
74   DBUG_ASSERT(thd->user_connect == 0);
75 
76   user_len= strlen(user);
77   temp_len= (strmov(strmov(temp_user, user)+1, host) - temp_user)+1;
78   mysql_mutex_lock(&LOCK_user_conn);
79   if (!(uc = (struct  user_conn *) my_hash_search(&hash_user_connections,
80 					       (uchar*) temp_user, temp_len)))
81   {
82     /* First connection for user; Create a user connection object */
83     if (!(uc= ((struct user_conn*)
84 	       my_malloc(key_memory_user_conn,
85                          sizeof(struct user_conn) + temp_len+1, MYF(MY_WME)))))
86     {
87       /* MY_WME ensures an error is set in THD. */
88       return_val= 1;
89       goto end;
90     }
91     uc->user=(char*) (uc+1);
92     memcpy(uc->user,temp_user,temp_len+1);
93     uc->host= uc->user + user_len +  1;
94     uc->len= (uint)temp_len;
95     uc->connections= uc->questions= uc->updates= uc->conn_per_hour= 0;
96     uc->reset_utime= thd->thr_create_utime;
97     if (my_hash_insert(&hash_user_connections, (uchar*) uc))
98     {
99       /* The only possible error is out of memory, MY_WME sets an error. */
100       my_free(uc);
101       return_val= 1;
102       goto end;
103     }
104   }
105   uc->user_resources= *mqh;
106   thd->user_connect=uc;
107   uc->connections++;
108 end:
109   mysql_mutex_unlock(&LOCK_user_conn);
110   return return_val;
111 }
112 
113 
114 /*
115   check if user has already too many connections
116 
117   SYNOPSIS
118   check_for_max_user_connections()
119   thd			Thread handle
120   uc			User connect object
121 
122   NOTES
123     If check fails, we decrease user connection count, which means one
124     shouldn't call decrease_user_connections() after this function.
125 
126   RETURN
127     0	ok
128     1	error
129 */
130 
check_for_max_user_connections(THD * thd,USER_CONN * uc)131 int check_for_max_user_connections(THD *thd, USER_CONN *uc)
132 {
133   int error= 1;
134   Host_errors errors;
135   DBUG_ENTER("check_for_max_user_connections");
136 
137   mysql_mutex_lock(&LOCK_user_conn);
138 
139   /* Root is not affected by the value of max_user_connections */
140   if (global_system_variables.max_user_connections &&
141       !uc->user_resources.user_conn &&
142       global_system_variables.max_user_connections < uc->connections &&
143       !(thd->security_ctx->master_access & PRIV_IGNORE_MAX_USER_CONNECTIONS))
144   {
145     my_error(ER_TOO_MANY_USER_CONNECTIONS, MYF(0), uc->user);
146     error=1;
147     errors.m_max_user_connection= 1;
148     goto end;
149   }
150   time_out_user_resource_limits(thd, uc);
151   if (uc->user_resources.user_conn &&
152       uc->user_resources.user_conn < uc->connections)
153   {
154     my_error(ER_USER_LIMIT_REACHED, MYF(0), uc->user,
155              "max_user_connections",
156              (long) uc->user_resources.user_conn);
157     error= 1;
158     errors.m_max_user_connection= 1;
159     goto end;
160   }
161   if (uc->user_resources.conn_per_hour &&
162       uc->user_resources.conn_per_hour <= uc->conn_per_hour)
163   {
164     my_error(ER_USER_LIMIT_REACHED, MYF(0), uc->user,
165              "max_connections_per_hour",
166              (long) uc->user_resources.conn_per_hour);
167     error=1;
168     errors.m_max_user_connection_per_hour= 1;
169     goto end;
170   }
171   uc->conn_per_hour++;
172   error= 0;
173 
174 end:
175   if (unlikely(error))
176   {
177     uc->connections--; // no need for decrease_user_connections() here
178     /*
179       The thread may returned back to the pool and assigned to a user
180       that doesn't have a limit. Ensure the user is not using resources
181       of someone else.
182     */
183     thd->user_connect= NULL;
184   }
185   mysql_mutex_unlock(&LOCK_user_conn);
186   if (unlikely(error))
187   {
188     inc_host_errors(thd->main_security_ctx.ip, &errors);
189   }
190   DBUG_RETURN(error);
191 }
192 
193 
194 /*
195   Decrease user connection count
196 
197   SYNOPSIS
198     decrease_user_connections()
199     uc			User connection object
200 
201   NOTES
202     If there is a n user connection object for a connection
203     (which only happens if 'max_user_connections' is defined or
204     if someone has created a resource grant for a user), then
205     the connection count is always incremented on connect.
206 
207     The user connect object is not freed if some users has
208     'max connections per hour' defined as we need to be able to hold
209     count over the lifetime of the connection.
210 */
211 
decrease_user_connections(USER_CONN * uc)212 void decrease_user_connections(USER_CONN *uc)
213 {
214   DBUG_ENTER("decrease_user_connections");
215   mysql_mutex_lock(&LOCK_user_conn);
216   DBUG_ASSERT(uc->connections);
217   if (!--uc->connections && !mqh_used)
218   {
219     /* Last connection for user; Delete it */
220     (void) my_hash_delete(&hash_user_connections,(uchar*) uc);
221   }
222   mysql_mutex_unlock(&LOCK_user_conn);
223   DBUG_VOID_RETURN;
224 }
225 
226 
227 /*
228   Reset per-hour user resource limits when it has been more than
229   an hour since they were last checked
230 
231   SYNOPSIS:
232     time_out_user_resource_limits()
233     thd			Thread handler
234     uc			User connection details
235 
236   NOTE:
237     This assumes that the LOCK_user_conn mutex has been acquired, so it is
238     safe to test and modify members of the USER_CONN structure.
239 */
240 
time_out_user_resource_limits(THD * thd,USER_CONN * uc)241 void time_out_user_resource_limits(THD *thd, USER_CONN *uc)
242 {
243   ulonglong check_time= thd->start_utime;
244   DBUG_ENTER("time_out_user_resource_limits");
245 
246   /* If more than a hour since last check, reset resource checking */
247   if (check_time  - uc->reset_utime >= 3600000000ULL)
248   {
249     uc->questions=0;
250     uc->updates=0;
251     uc->conn_per_hour=0;
252     uc->reset_utime= check_time;
253   }
254 
255   DBUG_VOID_RETURN;
256 }
257 
258 /*
259   Check if maximum queries per hour limit has been reached
260   returns 0 if OK.
261 */
262 
check_mqh(THD * thd,uint check_command)263 bool check_mqh(THD *thd, uint check_command)
264 {
265   bool error= 0;
266   USER_CONN *uc=thd->user_connect;
267   DBUG_ENTER("check_mqh");
268   DBUG_ASSERT(uc != 0);
269 
270   mysql_mutex_lock(&LOCK_user_conn);
271 
272   time_out_user_resource_limits(thd, uc);
273 
274   /* Check that we have not done too many questions / hour */
275   if (uc->user_resources.questions &&
276       uc->questions++ >= uc->user_resources.questions)
277   {
278     my_error(ER_USER_LIMIT_REACHED, MYF(0), uc->user, "max_queries_per_hour",
279              (long) uc->user_resources.questions);
280     error=1;
281     goto end;
282   }
283   if (check_command < (uint) SQLCOM_END)
284   {
285     /* Check that we have not done too many updates / hour */
286     if (uc->user_resources.updates &&
287         (sql_command_flags[check_command] & CF_CHANGES_DATA) &&
288 	uc->updates++ >= uc->user_resources.updates)
289     {
290       my_error(ER_USER_LIMIT_REACHED, MYF(0), uc->user, "max_updates_per_hour",
291                (long) uc->user_resources.updates);
292       error=1;
293       goto end;
294     }
295   }
296 end:
297   mysql_mutex_unlock(&LOCK_user_conn);
298   DBUG_RETURN(error);
299 }
300 
301 #endif /* NO_EMBEDDED_ACCESS_CHECKS */
302 
303 /*
304   Check for maximum allowable user connections, if the mysqld server is
305   started with corresponding variable that is greater then 0.
306 */
307 
get_key_conn(user_conn * buff,size_t * length,my_bool not_used)308 extern "C" uchar *get_key_conn(user_conn *buff, size_t *length,
309 			      my_bool not_used __attribute__((unused)))
310 {
311   *length= buff->len;
312   return (uchar*) buff->user;
313 }
314 
315 
free_user(struct user_conn * uc)316 extern "C" void free_user(struct user_conn *uc)
317 {
318   my_free(uc);
319 }
320 
321 
init_max_user_conn(void)322 void init_max_user_conn(void)
323 {
324 #ifndef NO_EMBEDDED_ACCESS_CHECKS
325   my_hash_init(key_memory_user_conn, &hash_user_connections,
326                system_charset_info, max_connections, 0, 0, (my_hash_get_key)
327                get_key_conn, (my_hash_free_key) free_user, 0);
328 #endif
329 }
330 
331 
free_max_user_conn(void)332 void free_max_user_conn(void)
333 {
334 #ifndef NO_EMBEDDED_ACCESS_CHECKS
335   my_hash_free(&hash_user_connections);
336 #endif /* NO_EMBEDDED_ACCESS_CHECKS */
337 }
338 
339 
reset_mqh(LEX_USER * lu,bool get_them=0)340 void reset_mqh(LEX_USER *lu, bool get_them= 0)
341 {
342 #ifndef NO_EMBEDDED_ACCESS_CHECKS
343   mysql_mutex_lock(&LOCK_user_conn);
344   if (lu)  // for GRANT
345   {
346     USER_CONN *uc;
347     size_t temp_len=lu->user.length+lu->host.length+2;
348     char temp_user[USER_HOST_BUFF_SIZE];
349 
350     memcpy(temp_user,lu->user.str,lu->user.length);
351     memcpy(temp_user+lu->user.length+1,lu->host.str,lu->host.length);
352     temp_user[lu->user.length]='\0'; temp_user[temp_len-1]=0;
353     if ((uc = (struct  user_conn *) my_hash_search(&hash_user_connections,
354                                                    (uchar*) temp_user,
355                                                    temp_len)))
356     {
357       uc->questions=0;
358       get_mqh(temp_user,&temp_user[lu->user.length+1],uc);
359       uc->updates=0;
360       uc->conn_per_hour=0;
361     }
362   }
363   else
364   {
365     /* for FLUSH PRIVILEGES and FLUSH USER_RESOURCES */
366     for (uint idx=0;idx < hash_user_connections.records; idx++)
367     {
368       USER_CONN *uc=(struct user_conn *)
369         my_hash_element(&hash_user_connections, idx);
370       if (get_them)
371 	get_mqh(uc->user,uc->host,uc);
372       uc->questions=0;
373       uc->updates=0;
374       uc->conn_per_hour=0;
375     }
376   }
377   mysql_mutex_unlock(&LOCK_user_conn);
378 #endif /* NO_EMBEDDED_ACCESS_CHECKS */
379 }
380 
381 /*****************************************************************************
382  Handle users statistics
383 *****************************************************************************/
384 
385 /* 'mysql_system_user' is used for when the user is not defined for a THD. */
386 static const char mysql_system_user[]= "#mysql_system#";
387 
388 // Returns 'user' if it's not NULL.  Returns 'mysql_system_user' otherwise.
get_valid_user_string(const char * user)389 static const char * get_valid_user_string(const char* user)
390 {
391   return user ? user : mysql_system_user;
392 }
393 
394 /*
395   Returns string as 'IP' for the client-side of the connection represented by
396   'client'. Does not allocate memory. May return "".
397 */
398 
get_client_host(THD * client)399 static const char *get_client_host(THD *client)
400 {
401   return client->security_ctx->host_or_ip[0] ?
402     client->security_ctx->host_or_ip :
403     client->security_ctx->host ? client->security_ctx->host : "";
404 }
405 
get_key_user_stats(USER_STATS * user_stats,size_t * length,my_bool not_used)406 extern "C" uchar *get_key_user_stats(USER_STATS *user_stats, size_t *length,
407                                      my_bool not_used __attribute__((unused)))
408 {
409   *length= user_stats->user_name_length;
410   return (uchar*) user_stats->user;
411 }
412 
free_user_stats(USER_STATS * user_stats)413 void free_user_stats(USER_STATS* user_stats)
414 {
415   my_free(user_stats);
416 }
417 
init_user_stats(USER_STATS * user_stats,const char * user,size_t user_length,const char * priv_user,uint total_connections,uint total_ssl_connections,uint concurrent_connections,time_t connected_time,double busy_time,double cpu_time,ulonglong bytes_received,ulonglong bytes_sent,ulonglong binlog_bytes_written,ha_rows rows_sent,ha_rows rows_read,ha_rows rows_inserted,ha_rows rows_deleted,ha_rows rows_updated,ulonglong select_commands,ulonglong update_commands,ulonglong other_commands,ulonglong commit_trans,ulonglong rollback_trans,ulonglong denied_connections,ulonglong lost_connections,ulonglong max_statement_time_exceeded,ulonglong access_denied_errors,ulonglong empty_queries)418 void init_user_stats(USER_STATS *user_stats,
419                      const char *user,
420                      size_t user_length,
421                      const char *priv_user,
422                      uint total_connections,
423                      uint total_ssl_connections,
424                      uint concurrent_connections,
425                      time_t connected_time,
426                      double busy_time,
427                      double cpu_time,
428                      ulonglong bytes_received,
429                      ulonglong bytes_sent,
430                      ulonglong binlog_bytes_written,
431                      ha_rows rows_sent,
432                      ha_rows rows_read,
433                      ha_rows rows_inserted,
434                      ha_rows rows_deleted,
435                      ha_rows rows_updated,
436                      ulonglong select_commands,
437                      ulonglong update_commands,
438                      ulonglong other_commands,
439                      ulonglong commit_trans,
440                      ulonglong rollback_trans,
441                      ulonglong denied_connections,
442                      ulonglong lost_connections,
443                      ulonglong max_statement_time_exceeded,
444                      ulonglong access_denied_errors,
445                      ulonglong empty_queries)
446 {
447   DBUG_ENTER("init_user_stats");
448   DBUG_PRINT("enter", ("user: %s  priv_user: %s", user, priv_user));
449 
450   user_length= MY_MIN(user_length, sizeof(user_stats->user)-1);
451   memcpy(user_stats->user, user, user_length);
452   user_stats->user[user_length]= 0;
453   user_stats->user_name_length= (uint)user_length;
454   strmake_buf(user_stats->priv_user, priv_user);
455 
456   user_stats->total_connections= total_connections;
457   user_stats->total_ssl_connections=  total_ssl_connections;
458   user_stats->concurrent_connections= concurrent_connections;
459   user_stats->connected_time= connected_time;
460   user_stats->busy_time= busy_time;
461   user_stats->cpu_time= cpu_time;
462   user_stats->bytes_received= bytes_received;
463   user_stats->bytes_sent= bytes_sent;
464   user_stats->binlog_bytes_written= binlog_bytes_written;
465   user_stats->rows_sent= rows_sent;
466   user_stats->rows_read= rows_read;
467   user_stats->rows_inserted= rows_inserted;
468   user_stats->rows_deleted= rows_deleted;
469   user_stats->rows_updated= rows_updated;
470   user_stats->select_commands= select_commands;
471   user_stats->update_commands= update_commands;
472   user_stats->other_commands= other_commands;
473   user_stats->commit_trans= commit_trans;
474   user_stats->rollback_trans= rollback_trans;
475   user_stats->denied_connections= denied_connections;
476   user_stats->lost_connections= lost_connections;
477   user_stats->max_statement_time_exceeded= max_statement_time_exceeded;
478   user_stats->access_denied_errors= access_denied_errors;
479   user_stats->empty_queries= empty_queries;
480   DBUG_VOID_RETURN;
481 }
482 
483 
init_global_user_stats(void)484 void init_global_user_stats(void)
485 {
486   my_hash_init(PSI_INSTRUMENT_ME, &global_user_stats, system_charset_info, max_connections,
487                0, 0, (my_hash_get_key) get_key_user_stats,
488                (my_hash_free_key) free_user_stats, 0);
489 }
490 
init_global_client_stats(void)491 void init_global_client_stats(void)
492 {
493   my_hash_init(PSI_INSTRUMENT_ME, &global_client_stats, system_charset_info, max_connections,
494                0, 0, (my_hash_get_key) get_key_user_stats,
495                (my_hash_free_key) free_user_stats, 0);
496 }
497 
get_key_table_stats(TABLE_STATS * table_stats,size_t * length,my_bool not_used)498 extern "C" uchar *get_key_table_stats(TABLE_STATS *table_stats, size_t *length,
499                                       my_bool not_used __attribute__((unused)))
500 {
501   *length= table_stats->table_name_length;
502   return (uchar*) table_stats->table;
503 }
504 
free_table_stats(TABLE_STATS * table_stats)505 extern "C" void free_table_stats(TABLE_STATS* table_stats)
506 {
507   my_free(table_stats);
508 }
509 
init_global_table_stats(void)510 void init_global_table_stats(void)
511 {
512   my_hash_init(PSI_INSTRUMENT_ME, &global_table_stats, system_charset_info,
513                max_connections, 0, 0, (my_hash_get_key) get_key_table_stats,
514                (my_hash_free_key) free_table_stats, 0);
515 }
516 
get_key_index_stats(INDEX_STATS * index_stats,size_t * length,my_bool not_used)517 extern "C" uchar *get_key_index_stats(INDEX_STATS *index_stats, size_t *length,
518                                      my_bool not_used __attribute__((unused)))
519 {
520   *length= index_stats->index_name_length;
521   return (uchar*) index_stats->index;
522 }
523 
free_index_stats(INDEX_STATS * index_stats)524 extern "C" void free_index_stats(INDEX_STATS* index_stats)
525 {
526   my_free(index_stats);
527 }
528 
init_global_index_stats(void)529 void init_global_index_stats(void)
530 {
531   my_hash_init(PSI_INSTRUMENT_ME, &global_index_stats, system_charset_info,
532                max_connections, 0, 0, (my_hash_get_key) get_key_index_stats,
533                (my_hash_free_key) free_index_stats, 0);
534 }
535 
536 
free_global_user_stats(void)537 void free_global_user_stats(void)
538 {
539   my_hash_free(&global_user_stats);
540 }
541 
free_global_table_stats(void)542 void free_global_table_stats(void)
543 {
544   my_hash_free(&global_table_stats);
545 }
546 
free_global_index_stats(void)547 void free_global_index_stats(void)
548 {
549   my_hash_free(&global_index_stats);
550 }
551 
free_global_client_stats(void)552 void free_global_client_stats(void)
553 {
554   my_hash_free(&global_client_stats);
555 }
556 
557 /*
558   Increments the global stats connection count for an entry from
559   global_client_stats or global_user_stats. Returns 0 on success
560   and 1 on error.
561 */
562 
increment_count_by_name(const char * name,size_t name_length,const char * role_name,HASH * users_or_clients,THD * thd)563 static bool increment_count_by_name(const char *name, size_t name_length,
564                                    const char *role_name,
565                                    HASH *users_or_clients, THD *thd)
566 {
567   USER_STATS *user_stats;
568 
569   if (!(user_stats= (USER_STATS*) my_hash_search(users_or_clients, (uchar*) name,
570                                               name_length)))
571   {
572     /* First connection for this user or client */
573     if (!(user_stats= ((USER_STATS*)
574                        my_malloc(PSI_INSTRUMENT_ME, sizeof(USER_STATS),
575                                  MYF(MY_WME | MY_ZEROFILL)))))
576       return TRUE;                              // Out of memory
577 
578     init_user_stats(user_stats, name, name_length, role_name,
579                     0, 0, 0,   // connections
580                     0, 0, 0,   // time
581                     0, 0, 0,   // bytes sent, received and written
582                     0, 0,      // rows sent and read
583                     0, 0, 0,   // rows inserted, deleted and updated
584                     0, 0, 0,   // select, update and other commands
585                     0, 0,      // commit and rollback trans
586                     thd->status_var.access_denied_errors,
587                     0,         // lost connections
588                     0,         // max query timeouts
589                     0,         // access denied errors
590                     0);        // empty queries
591 
592     if (my_hash_insert(users_or_clients, (uchar*)user_stats))
593     {
594       my_free(user_stats);
595       return TRUE;                              // Out of memory
596     }
597   }
598   user_stats->total_connections++;
599   if (thd->net.vio && thd->net.vio->type == VIO_TYPE_SSL)
600     user_stats->total_ssl_connections++;
601   return FALSE;
602 }
603 
604 
605 /*
606   Increments the global user and client stats connection count.
607 
608   @param use_lock  if true, LOCK_global_user_client_stats will be locked
609 
610   @retval 0 ok
611   @retval 1 error.
612 */
613 
614 #ifndef EMBEDDED_LIBRARY
increment_connection_count(THD * thd,bool use_lock)615 static bool increment_connection_count(THD* thd, bool use_lock)
616 {
617   const char *user_string= get_valid_user_string(thd->main_security_ctx.user);
618   const char *client_string= get_client_host(thd);
619   bool return_value= FALSE;
620 
621   if (!thd->userstat_running)
622     return FALSE;
623 
624   if (use_lock)
625     mysql_mutex_lock(&LOCK_global_user_client_stats);
626 
627   if (increment_count_by_name(user_string, strlen(user_string), user_string,
628                               &global_user_stats, thd))
629   {
630     return_value= TRUE;
631     goto end;
632   }
633   if (increment_count_by_name(client_string, strlen(client_string),
634                               user_string, &global_client_stats, thd))
635   {
636     return_value= TRUE;
637     goto end;
638   }
639 
640 end:
641   if (use_lock)
642     mysql_mutex_unlock(&LOCK_global_user_client_stats);
643   return return_value;
644 }
645 #endif
646 
647 /*
648   Used to update the global user and client stats
649 */
650 
update_global_user_stats_with_user(THD * thd,USER_STATS * user_stats,time_t now)651 static void update_global_user_stats_with_user(THD *thd,
652                                                USER_STATS *user_stats,
653                                                time_t now)
654 {
655   DBUG_ASSERT(thd->userstat_running);
656 
657   user_stats->connected_time+= now - thd->last_global_update_time;
658   user_stats->busy_time+=  (thd->status_var.busy_time -
659                             thd->org_status_var.busy_time);
660   user_stats->cpu_time+=   (thd->status_var.cpu_time -
661                             thd->org_status_var.cpu_time);
662   /*
663     This is handle specially as bytes_received is incremented BEFORE
664     org_status_var is copied.
665   */
666   user_stats->bytes_received+= (thd->org_status_var.bytes_received-
667                                 thd->start_bytes_received);
668   user_stats->bytes_sent+= (thd->status_var.bytes_sent -
669                             thd->org_status_var.bytes_sent);
670   user_stats->binlog_bytes_written+=
671     (thd->status_var.binlog_bytes_written -
672      thd->org_status_var.binlog_bytes_written);
673   /* We are not counting rows in internal temporary tables here ! */
674   user_stats->rows_read+=      (thd->status_var.rows_read -
675                                 thd->org_status_var.rows_read);
676   user_stats->rows_sent+=      (thd->status_var.rows_sent -
677                                 thd->org_status_var.rows_sent);
678   user_stats->rows_inserted+=  (thd->status_var.ha_write_count -
679                                 thd->org_status_var.ha_write_count);
680   user_stats->rows_deleted+=   (thd->status_var.ha_delete_count -
681                                 thd->org_status_var.ha_delete_count);
682   user_stats->rows_updated+=   (thd->status_var.ha_update_count -
683                                 thd->org_status_var.ha_update_count);
684   user_stats->select_commands+= thd->select_commands;
685   user_stats->update_commands+= thd->update_commands;
686   user_stats->other_commands+=  thd->other_commands;
687   user_stats->commit_trans+=   (thd->status_var.ha_commit_count -
688                                 thd->org_status_var.ha_commit_count);
689   user_stats->rollback_trans+= (thd->status_var.ha_rollback_count +
690                                 thd->status_var.ha_savepoint_rollback_count -
691                                 thd->org_status_var.ha_rollback_count -
692                                 thd->org_status_var.
693                                 ha_savepoint_rollback_count);
694   user_stats->access_denied_errors+=
695     (thd->status_var.access_denied_errors -
696      thd->org_status_var.access_denied_errors);
697   user_stats->empty_queries+=   (thd->status_var.empty_queries -
698                                  thd->org_status_var.empty_queries);
699 
700   /* The following can only contain 0 or 1 and then connection ends */
701   user_stats->denied_connections+= thd->status_var.access_denied_errors;
702   user_stats->lost_connections+=   thd->status_var.lost_connections;
703   user_stats->max_statement_time_exceeded+= thd->status_var.max_statement_time_exceeded;
704 }
705 
706 
707 /*  Updates the global stats of a user or client */
update_global_user_stats(THD * thd,bool create_user,time_t now)708 void update_global_user_stats(THD *thd, bool create_user, time_t now)
709 {
710   const char *user_string, *client_string;
711   USER_STATS *user_stats;
712   size_t user_string_length, client_string_length;
713   DBUG_ASSERT(thd->userstat_running);
714 
715   user_string= get_valid_user_string(thd->main_security_ctx.user);
716   user_string_length= strlen(user_string);
717   client_string= get_client_host(thd);
718   client_string_length= strlen(client_string);
719 
720   mysql_mutex_lock(&LOCK_global_user_client_stats);
721 
722   // Update by user name
723   if ((user_stats= (USER_STATS*) my_hash_search(&global_user_stats,
724                                              (uchar*) user_string,
725                                              user_string_length)))
726   {
727     /* Found user. */
728     update_global_user_stats_with_user(thd, user_stats, now);
729   }
730   else
731   {
732     /* Create the entry */
733     if (create_user)
734     {
735       increment_count_by_name(user_string, user_string_length, user_string,
736                               &global_user_stats, thd);
737     }
738   }
739 
740   /* Update by client IP */
741   if ((user_stats= (USER_STATS*)my_hash_search(&global_client_stats,
742                                             (uchar*) client_string,
743                                             client_string_length)))
744   {
745     // Found by client IP
746     update_global_user_stats_with_user(thd, user_stats, now);
747   }
748   else
749   {
750     // Create the entry
751     if (create_user)
752     {
753       increment_count_by_name(client_string, client_string_length,
754                               user_string, &global_client_stats, thd);
755     }
756   }
757   /* Reset variables only used for counting */
758   thd->select_commands= thd->update_commands= thd->other_commands= 0;
759   thd->last_global_update_time= now;
760 
761   mysql_mutex_unlock(&LOCK_global_user_client_stats);
762 }
763 
764 
765 /**
766   Set thread character set variables from the given ID
767 
768   @param  thd         thread handle
769   @param  cs_number   character set and collation ID
770 
771   @retval  0  OK; character_set_client, collation_connection and
772               character_set_results are set to the new value,
773               or to the default global values.
774 
775   @retval  1  error, e.g. the given ID is not supported by parser.
776               Corresponding SQL error is sent.
777 */
778 
thd_init_client_charset(THD * thd,uint cs_number)779 bool thd_init_client_charset(THD *thd, uint cs_number)
780 {
781   CHARSET_INFO *cs;
782   /*
783    Use server character set and collation if
784    - opt_character_set_client_handshake is not set
785    - client has not specified a character set
786    - client character set doesn't exists in server
787   */
788   if (!opt_character_set_client_handshake ||
789       !(cs= get_charset(cs_number, MYF(0))))
790   {
791     thd->update_charset(global_system_variables.character_set_client,
792                         global_system_variables.collation_connection,
793                         global_system_variables.character_set_results);
794   }
795   else
796   {
797     if (!is_supported_parser_charset(cs))
798     {
799       /* Disallow non-supported parser character sets: UCS2, UTF16, UTF32 */
800       my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "character_set_client",
801                cs->csname);
802       return true;
803     }
804     thd->org_charset= cs;
805     thd->update_charset(cs,cs,cs);
806   }
807   return false;
808 }
809 
810 
811 /*
812   Initialize connection threads
813 */
814 
815 #ifndef EMBEDDED_LIBRARY
init_new_connection_handler_thread()816 bool init_new_connection_handler_thread()
817 {
818   pthread_detach_this_thread();
819   if (my_thread_init())
820   {
821     statistic_increment(aborted_connects,&LOCK_status);
822     statistic_increment(connection_errors_internal, &LOCK_status);
823     return 1;
824   }
825   DBUG_EXECUTE_IF("simulate_failed_connection_1", return(1); );
826   return 0;
827 }
828 
829 /**
830   Set client address during authentication.
831 
832   Initializes THD::main_security_ctx and THD::peer_port.
833   Optionally does ip to hostname translation.
834 
835   @param thd   current THD handle
836   @param addr  peer address (can be NULL, if 'ip' is set)
837   @param ip    peer address as string (can be NULL if 'addr' is set)
838   @param port  peer port
839   @param check_proxy_networks if true, and host is in
840                'proxy_protocol_networks' list, skip
841                "host not privileged" check
842   @param[out] host_errors - number of connect
843               errors for this host
844 
845   @retval 0 ok, 1 error
846 */
thd_set_peer_addr(THD * thd,sockaddr_storage * addr,const char * ip,uint port,bool check_proxy_networks,uint * host_errors)847 int thd_set_peer_addr(THD *thd,
848   sockaddr_storage *addr,
849   const char *ip,
850   uint port,
851   bool check_proxy_networks,
852   uint *host_errors)
853 {
854   *host_errors= 0;
855 
856   thd->peer_port= port;
857 
858   char ip_string[128];
859   if (!ip)
860   {
861     void *addr_data;
862     if (addr->ss_family == AF_UNIX)
863     {
864         /* local connection */
865         my_free((void *)thd->main_security_ctx.ip);
866         thd->main_security_ctx.host_or_ip= thd->main_security_ctx.host = my_localhost;
867         thd->main_security_ctx.ip= 0;
868         return 0;
869     }
870     else if (addr->ss_family == AF_INET)
871       addr_data= &((struct sockaddr_in *)addr)->sin_addr;
872     else
873       addr_data= &((struct sockaddr_in6 *)addr)->sin6_addr;
874     if (!inet_ntop(addr->ss_family,addr_data, ip_string, sizeof(ip_string)))
875     {
876       DBUG_ASSERT(0);
877       return 1;
878     }
879     ip= ip_string;
880   }
881 
882   my_free((void *)thd->main_security_ctx.ip);
883   if (!(thd->main_security_ctx.ip = my_strdup(PSI_INSTRUMENT_ME, ip, MYF(MY_WME))))
884   {
885     /*
886     No error accounting per IP in host_cache,
887     this is treated as a global server OOM error.
888     TODO: remove the need for my_strdup.
889     */
890     statistic_increment(aborted_connects, &LOCK_status);
891     statistic_increment(connection_errors_internal, &LOCK_status);
892     return 1; /* The error is set by my_strdup(). */
893   }
894   thd->main_security_ctx.host_or_ip = thd->main_security_ctx.ip;
895   if (!(specialflag & SPECIAL_NO_RESOLVE))
896   {
897     int rc;
898 
899     rc = ip_to_hostname(addr,
900       thd->main_security_ctx.ip,
901       &thd->main_security_ctx.host,
902       host_errors);
903 
904     /* Cut very long hostnames to avoid possible overflows */
905     if (thd->main_security_ctx.host)
906     {
907       if (thd->main_security_ctx.host != my_localhost)
908         ((char*)thd->main_security_ctx.host)[MY_MIN(strlen(thd->main_security_ctx.host),
909           HOSTNAME_LENGTH)] = 0;
910       thd->main_security_ctx.host_or_ip = thd->main_security_ctx.host;
911     }
912 
913     if (rc == RC_BLOCKED_HOST)
914     {
915       /* HOST_CACHE stats updated by ip_to_hostname(). */
916       my_error(ER_HOST_IS_BLOCKED, MYF(0), thd->main_security_ctx.host_or_ip);
917       return 1;
918     }
919   }
920   DBUG_PRINT("info", ("Host: %s  ip: %s",
921     (thd->main_security_ctx.host ?
922       thd->main_security_ctx.host : "unknown host"),
923       (thd->main_security_ctx.ip ?
924         thd->main_security_ctx.ip : "unknown ip")));
925   if ((!check_proxy_networks || !is_proxy_protocol_allowed((struct sockaddr *) addr))
926       && acl_check_host(thd->main_security_ctx.host, thd->main_security_ctx.ip))
927   {
928     /* HOST_CACHE stats updated by acl_check_host(). */
929     my_error(ER_HOST_NOT_PRIVILEGED, MYF(0),
930       thd->main_security_ctx.host_or_ip);
931     return 1;
932   }
933   return 0;
934 }
935 
936 /*
937   Perform handshake, authorize client and update thd ACL variables.
938 
939   SYNOPSIS
940     check_connection()
941     thd  thread handle
942 
943   RETURN
944      0  success, thd is updated.
945      1  error
946 */
947 
check_connection(THD * thd)948 static int check_connection(THD *thd)
949 {
950   uint connect_errors= 0;
951   int auth_rc;
952   NET *net= &thd->net;
953 
954   DBUG_PRINT("info",
955              ("New connection received on %s", vio_description(net->vio)));
956 
957 #ifdef SIGNAL_WITH_VIO_CLOSE
958   thd->set_active_vio(net->vio);
959 #endif
960 
961   if (!thd->main_security_ctx.host)         // If TCP/IP connection
962   {
963     my_bool peer_rc;
964     char ip[NI_MAXHOST];
965     uint16 peer_port;
966 
967     peer_rc= vio_peer_addr(net->vio, ip, &peer_port, NI_MAXHOST);
968 
969     /*
970     ===========================================================================
971     DEBUG code only (begin)
972     Simulate various output from vio_peer_addr().
973     ===========================================================================
974     */
975 
976     DBUG_EXECUTE_IF("vio_peer_addr_error",
977                     {
978                       peer_rc= 1;
979                     }
980                     );
981     DBUG_EXECUTE_IF("vio_peer_addr_fake_ipv4",
982                     {
983                       struct sockaddr *sa= (sockaddr *) &net->vio->remote;
984                       sa->sa_family= AF_INET;
985                       struct in_addr *ip4= &((struct sockaddr_in *) sa)->sin_addr;
986                       /* See RFC 5737, 192.0.2.0/24 is reserved. */
987                       const char* fake= "192.0.2.4";
988                       inet_pton(AF_INET,fake, ip4);
989                       strcpy(ip, fake);
990                       peer_rc= 0;
991                     }
992                     );
993 
994 #ifdef HAVE_IPV6
995     DBUG_EXECUTE_IF("vio_peer_addr_fake_ipv6",
996                     {
997                       struct sockaddr_in6 *sa= (sockaddr_in6 *) &net->vio->remote;
998                       sa->sin6_family= AF_INET6;
999                       struct in6_addr *ip6= & sa->sin6_addr;
1000                       /* See RFC 3849, ipv6 2001:DB8::/32 is reserved. */
1001                       const char* fake= "2001:db8::6:6";
1002                       /* inet_pton(AF_INET6, fake, ip6); not available on Windows XP. */
1003                       ip6->s6_addr[ 0] = 0x20;
1004                       ip6->s6_addr[ 1] = 0x01;
1005                       ip6->s6_addr[ 2] = 0x0d;
1006                       ip6->s6_addr[ 3] = 0xb8;
1007                       ip6->s6_addr[ 4] = 0x00;
1008                       ip6->s6_addr[ 5] = 0x00;
1009                       ip6->s6_addr[ 6] = 0x00;
1010                       ip6->s6_addr[ 7] = 0x00;
1011                       ip6->s6_addr[ 8] = 0x00;
1012                       ip6->s6_addr[ 9] = 0x00;
1013                       ip6->s6_addr[10] = 0x00;
1014                       ip6->s6_addr[11] = 0x00;
1015                       ip6->s6_addr[12] = 0x00;
1016                       ip6->s6_addr[13] = 0x06;
1017                       ip6->s6_addr[14] = 0x00;
1018                       ip6->s6_addr[15] = 0x06;
1019                       strcpy(ip, fake);
1020                       peer_rc= 0;
1021                     }
1022                     );
1023 #endif /* HAVE_IPV6 */
1024 
1025     /*
1026     ===========================================================================
1027     DEBUG code only (end)
1028     ===========================================================================
1029     */
1030 
1031     if (peer_rc)
1032     {
1033       /*
1034         Since we can not even get the peer IP address,
1035         there is nothing to show in the host_cache,
1036         so increment the global status variable for peer address errors.
1037       */
1038       statistic_increment(connection_errors_peer_addr, &LOCK_status);
1039       my_error(ER_BAD_HOST_ERROR, MYF(0));
1040       statistic_increment(aborted_connects_preauth, &LOCK_status);
1041       return 1;
1042     }
1043 
1044     if (thd_set_peer_addr(thd, &net->vio->remote, ip, peer_port,
1045                           true, &connect_errors))
1046     {
1047       statistic_increment(aborted_connects_preauth, &LOCK_status);
1048       return 1;
1049     }
1050   }
1051   else /* Hostname given means that the connection was on a socket */
1052   {
1053     DBUG_PRINT("info",("Host: %s", thd->main_security_ctx.host));
1054     thd->main_security_ctx.host_or_ip= thd->main_security_ctx.host;
1055     thd->main_security_ctx.ip= 0;
1056     /* Reset sin_addr */
1057     bzero((char*) &net->vio->remote, sizeof(net->vio->remote));
1058   }
1059   vio_keepalive(net->vio, TRUE);
1060   vio_set_keepalive_options(net->vio, &opt_vio_keepalive);
1061 
1062   if (unlikely(thd->packet.alloc(thd->variables.net_buffer_length)))
1063   {
1064     /*
1065       Important note:
1066       net_buffer_length is a SESSION variable,
1067       so it may be tempting to account OOM conditions per IP in the HOST_CACHE,
1068       in case some clients are more demanding than others ...
1069       However, this session variable is *not* initialized with a per client
1070       value during the initial connection, it is initialized from the
1071       GLOBAL net_buffer_length variable from the server.
1072       Hence, there is no reason to account on OOM conditions per client IP,
1073       we count failures in the global server status instead.
1074     */
1075     statistic_increment(aborted_connects,&LOCK_status);
1076     statistic_increment(connection_errors_internal, &LOCK_status);
1077     statistic_increment(aborted_connects_preauth, &LOCK_status);
1078     return 1; /* The error is set by alloc(). */
1079   }
1080 
1081   auth_rc= acl_authenticate(thd, 0);
1082   if (auth_rc == 0 && connect_errors != 0)
1083   {
1084     /*
1085       A client connection from this IP was successful,
1086       after some previous failures.
1087       Reset the connection error counter.
1088     */
1089     reset_host_connect_errors(thd->main_security_ctx.ip);
1090   }
1091 
1092   return auth_rc;
1093 }
1094 
1095 
1096 /*
1097   Setup thread to be used with the current thread
1098 
1099   SYNOPSIS
1100     bool setup_connection_thread_globals()
1101     thd    Thread/connection handler
1102 
1103   RETURN
1104     0   ok
1105     1   Error (out of memory)
1106         In this case we will close the connection and increment status
1107 */
1108 
setup_connection_thread_globals(THD * thd)1109 void setup_connection_thread_globals(THD *thd)
1110 {
1111   thd->store_globals();
1112 }
1113 
1114 
1115 /*
1116   Autenticate user, with error reporting
1117 
1118   SYNOPSIS
1119    login_connection()
1120    thd        Thread handler
1121 
1122   NOTES
1123     Connection is not closed in case of errors
1124 
1125   RETURN
1126     0    ok
1127     1    error
1128 */
1129 
login_connection(THD * thd)1130 bool login_connection(THD *thd)
1131 {
1132   NET *net= &thd->net;
1133   int error= 0;
1134   DBUG_ENTER("login_connection");
1135   DBUG_PRINT("info", ("login_connection called by thread %lu",
1136                       (ulong) thd->thread_id));
1137 
1138   /* Use "connect_timeout" value during connection phase */
1139   my_net_set_read_timeout(net, connect_timeout);
1140   my_net_set_write_timeout(net, connect_timeout);
1141 
1142   error= check_connection(thd);
1143   thd->protocol->end_statement();
1144 
1145   if (unlikely(error))
1146   {						// Wrong permissions
1147 #ifdef _WIN32
1148     if (vio_type(net->vio) == VIO_TYPE_NAMEDPIPE)
1149       my_sleep(1000);				/* must wait after eof() */
1150 #endif
1151     statistic_increment(aborted_connects,&LOCK_status);
1152     error=1;
1153     goto exit;
1154   }
1155   /* Connect completed, set read/write timeouts back to default */
1156   my_net_set_read_timeout(net, thd->variables.net_read_timeout);
1157   my_net_set_write_timeout(net, thd->variables.net_write_timeout);
1158 
1159   /*  Updates global user connection stats. */
1160   if (increment_connection_count(thd, TRUE))
1161   {
1162     my_error(ER_OUTOFMEMORY, MYF(0), (int) (2*sizeof(USER_STATS)));
1163     error= 1;
1164     goto exit;
1165   }
1166 
1167 exit:
1168   mysql_audit_notify_connection_connect(thd);
1169   DBUG_RETURN(error);
1170 }
1171 
1172 
1173 /*
1174   Close an established connection
1175 
1176   NOTES
1177     This mainly updates status variables
1178 */
1179 
end_connection(THD * thd)1180 void end_connection(THD *thd)
1181 {
1182   NET *net= &thd->net;
1183 
1184 #ifdef WITH_WSREP
1185   if (thd->wsrep_cs().state() == wsrep::client_state::s_exec)
1186   {
1187     /* Error happened after the thread acquired ownership to wsrep
1188        client state, but before command was processed. Clean up the
1189        state before wsrep_close(). */
1190     wsrep_after_command_ignore_result(thd);
1191   }
1192   wsrep_close(thd);
1193 #endif /* WITH_WSREP */
1194   if (thd->user_connect)
1195   {
1196     /*
1197       We decrease this variable early to make it easy to log again quickly.
1198       This code is not critical as we will in any case do this test
1199       again in thd->cleanup()
1200     */
1201     decrease_user_connections(thd->user_connect);
1202     /*
1203       The thread may returned back to the pool and assigned to a user
1204       that doesn't have a limit. Ensure the user is not using resources
1205       of someone else.
1206     */
1207     thd->user_connect= NULL;
1208   }
1209 
1210   if (unlikely(thd->killed) || (net->error && net->vio != 0))
1211   {
1212     statistic_increment(aborted_threads,&LOCK_status);
1213     status_var_increment(thd->status_var.lost_connections);
1214   }
1215 
1216   if (likely(!thd->killed) && (net->error && net->vio != 0))
1217     thd->print_aborted_warning(1, thd->get_stmt_da()->is_error()
1218              ? thd->get_stmt_da()->message() : ER_THD(thd, ER_UNKNOWN_ERROR));
1219 }
1220 
1221 
1222 /*
1223   Initialize THD to handle queries
1224 */
1225 
prepare_new_connection_state(THD * thd)1226 void prepare_new_connection_state(THD* thd)
1227 {
1228   Security_context *sctx= thd->security_ctx;
1229 
1230   if (thd->client_capabilities & CLIENT_COMPRESS)
1231     thd->net.compress=1;				// Use compression
1232 
1233   /*
1234     Much of this is duplicated in create_embedded_thd() for the
1235     embedded server library.
1236     TODO: refactor this to avoid code duplication there
1237   */
1238   thd->proc_info= 0;
1239   thd->set_command(COM_SLEEP);
1240   thd->init_for_queries();
1241 
1242   if (opt_init_connect.length &&
1243       !(sctx->master_access & PRIV_IGNORE_INIT_CONNECT))
1244   {
1245     execute_init_command(thd, &opt_init_connect, &LOCK_sys_init_connect);
1246     if (unlikely(thd->is_error()))
1247     {
1248       Host_errors errors;
1249       thd->set_killed(KILL_CONNECTION);
1250       thd->print_aborted_warning(0, "init_connect command failed");
1251       sql_print_warning("%s", thd->get_stmt_da()->message());
1252 
1253       /*
1254         now let client to send its first command,
1255         to be able to send the error back
1256       */
1257       NET *net= &thd->net;
1258       thd->lex->current_select= 0;
1259       my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
1260       thd->clear_error();
1261       net_new_transaction(net);
1262       ulong packet_length= my_net_read(net);
1263       /*
1264         If my_net_read() failed, my_error() has been already called,
1265         and the main Diagnostics Area contains an error condition.
1266       */
1267       if (packet_length != packet_error)
1268         my_error(ER_NEW_ABORTING_CONNECTION, MYF(0),
1269                  thd->thread_id,
1270                  thd->db.str ? thd->db.str : "unconnected",
1271                  sctx->user ? sctx->user : "unauthenticated",
1272                  sctx->host_or_ip, "init_connect command failed");
1273       thd->server_status&= ~SERVER_STATUS_CLEAR_SET;
1274       thd->protocol->end_statement();
1275       thd->killed = KILL_CONNECTION;
1276       errors.m_init_connect= 1;
1277       inc_host_errors(thd->main_security_ctx.ip, &errors);
1278       return;
1279     }
1280 
1281     thd->proc_info=0;
1282   }
1283 }
1284 
1285 
1286 /*
1287   Thread handler for a connection
1288 
1289   SYNOPSIS
1290     handle_one_connection()
1291     arg		Connection object (THD)
1292 
1293   IMPLEMENTATION
1294     This function (normally) does the following:
1295     - Initialize thread
1296     - Initialize THD to be used with this thread
1297     - Authenticate user
1298     - Execute all queries sent on the connection
1299     - Take connection down
1300     - End thread  / Handle next connection using thread from thread cache
1301 */
1302 
handle_one_connection(void * arg)1303 pthread_handler_t handle_one_connection(void *arg)
1304 {
1305   CONNECT *connect= (CONNECT*) arg;
1306 
1307   mysql_thread_set_psi_id(connect->thread_id);
1308 
1309   if (init_new_connection_handler_thread())
1310     connect->close_with_error(0, 0, ER_OUT_OF_RESOURCES);
1311   else
1312     do_handle_one_connection(connect, true);
1313 
1314   DBUG_PRINT("info", ("killing thread"));
1315 #if defined(HAVE_OPENSSL) && !defined(EMBEDDED_LIBRARY)
1316   ERR_remove_state(0);
1317 #endif
1318   my_thread_end();
1319   return 0;
1320 }
1321 
thd_prepare_connection(THD * thd)1322 bool thd_prepare_connection(THD *thd)
1323 {
1324   bool rc;
1325   lex_start(thd);
1326   rc= login_connection(thd);
1327   if (rc)
1328     return rc;
1329 
1330   MYSQL_CONNECTION_START(thd->thread_id, &thd->security_ctx->priv_user[0],
1331                          (char *) thd->security_ctx->host_or_ip);
1332 
1333   prepare_new_connection_state(thd);
1334 #ifdef WITH_WSREP
1335   thd->wsrep_client_thread= true;
1336   wsrep_open(thd);
1337 #endif /* WITH_WSREP */
1338   return FALSE;
1339 }
1340 
thd_is_connection_alive(THD * thd)1341 bool thd_is_connection_alive(THD *thd)
1342 {
1343   NET *net= &thd->net;
1344   if (likely(!net->error &&
1345              net->vio != 0 &&
1346              thd->killed < KILL_CONNECTION))
1347     return TRUE;
1348   return FALSE;
1349 }
1350 
1351 
do_handle_one_connection(CONNECT * connect,bool put_in_cache)1352 void do_handle_one_connection(CONNECT *connect, bool put_in_cache)
1353 {
1354   ulonglong thr_create_utime= microsecond_interval_timer();
1355   THD *thd;
1356   if (!(thd= connect->create_thd(NULL)))
1357   {
1358     connect->close_and_delete();
1359     return;
1360   }
1361 
1362   DBUG_EXECUTE_IF("CONNECT_wait",
1363   {
1364     extern MYSQL_SOCKET unix_sock;
1365     DBUG_ASSERT(unix_sock.fd >= 0);
1366     while (unix_sock.fd >= 0)
1367       my_sleep(1000);
1368   });
1369 
1370   /*
1371     If a thread was created to handle this connection:
1372     increment slow_launch_threads counter if it took more than
1373     slow_launch_time seconds to create the thread.
1374   */
1375 
1376   if (connect->prior_thr_create_utime)
1377   {
1378     ulong launch_time= (ulong) (thr_create_utime -
1379                                 connect->prior_thr_create_utime);
1380     if (launch_time >= slow_launch_time*1000000L)
1381       statistic_increment(slow_launch_threads, &LOCK_status);
1382   }
1383 
1384   server_threads.insert(thd); // Make THD visible in show processlist
1385 
1386   delete connect; // must be after server_threads.insert, see close_connections()
1387 
1388   thd->thr_create_utime= thr_create_utime;
1389   /* We need to set this because of time_out_user_resource_limits */
1390   thd->start_utime= thr_create_utime;
1391 
1392   /*
1393     handle_one_connection() is normally the only way a thread would
1394     start and would always be on the very high end of the stack ,
1395     therefore, the thread stack always starts at the address of the
1396     first local variable of handle_one_connection, which is thd. We
1397     need to know the start of the stack so that we could check for
1398     stack overruns.
1399   */
1400   thd->thread_stack= (char*) &thd;
1401   setup_connection_thread_globals(thd);
1402 
1403   for (;;)
1404   {
1405     bool create_user= TRUE;
1406 
1407     mysql_socket_set_thread_owner(thd->net.vio->mysql_socket);
1408     if (thd_prepare_connection(thd))
1409     {
1410       create_user= FALSE;
1411       goto end_thread;
1412     }
1413 
1414     while (thd_is_connection_alive(thd))
1415     {
1416       if (mysql_audit_release_required(thd))
1417         mysql_audit_release(thd);
1418       if (do_command(thd))
1419 	break;
1420     }
1421     end_connection(thd);
1422 
1423 end_thread:
1424     close_connection(thd);
1425 
1426     if (thd->userstat_running)
1427       update_global_user_stats(thd, create_user, time(NULL));
1428 
1429     unlink_thd(thd);
1430     if (IF_WSREP(thd->wsrep_applier, false) || !put_in_cache ||
1431         !(connect= thread_cache.park()))
1432       break;
1433 
1434     /* Create new instrumentation for the new THD job */
1435     PSI_CALL_set_thread(PSI_CALL_new_thread(key_thread_one_connection, thd,
1436                                             thd->thread_id));
1437 
1438     if (!(connect->create_thd(thd)))
1439     {
1440       /* Out of resources. Free thread to get more resources */
1441       connect->close_and_delete();
1442       break;
1443     }
1444     delete connect;
1445 
1446     /*
1447       We have to call store_globals to update mysys_var->id and lock_info
1448       with the new thread_id
1449     */
1450     thd->store_globals();
1451 
1452     /* reset abort flag for the thread */
1453     thd->mysys_var->abort= 0;
1454     thd->thr_create_utime= microsecond_interval_timer();
1455     thd->start_utime= thd->thr_create_utime;
1456 
1457     server_threads.insert(thd);
1458   }
1459   delete thd;
1460 }
1461 #endif /* EMBEDDED_LIBRARY */
1462 
1463 
1464 /* Handling of CONNECT objects */
1465 
1466 /*
1467   Close connection without error and delete the connect object
1468   This and close_with_error are only called if we didn't manage to
1469   create a new thd object.
1470 */
1471 
close_and_delete()1472 void CONNECT::close_and_delete()
1473 {
1474   DBUG_ENTER("close_and_delete");
1475 
1476 #if _WIN32
1477   if (vio_type == VIO_TYPE_NAMEDPIPE)
1478     CloseHandle(pipe);
1479   else
1480 #endif
1481   if (vio_type != VIO_CLOSED)
1482     mysql_socket_close(sock);
1483   vio_type= VIO_CLOSED;
1484 
1485   --*scheduler->connection_count;
1486   statistic_increment(connection_errors_internal, &LOCK_status);
1487   statistic_increment(aborted_connects,&LOCK_status);
1488 
1489   delete this;
1490   DBUG_VOID_RETURN;
1491 }
1492 
1493 /*
1494   Close a connection with a possible error to the end user
1495   Alse deletes the connection object, like close_and_delete()
1496 */
1497 
close_with_error(uint sql_errno,const char * message,uint close_error)1498 void CONNECT::close_with_error(uint sql_errno,
1499                                const char *message, uint close_error)
1500 {
1501   THD *thd= create_thd(NULL);
1502   if (thd)
1503   {
1504     if (sql_errno)
1505       thd->protocol->net_send_error(thd, sql_errno, message, NULL);
1506     close_connection(thd, close_error);
1507     delete thd;
1508     set_current_thd(0);
1509   }
1510   close_and_delete();
1511 }
1512 
1513 
1514 /* Reuse or create a THD based on a CONNECT object */
1515 
create_thd(THD * thd)1516 THD *CONNECT::create_thd(THD *thd)
1517 {
1518   bool res, thd_reused= thd != 0;
1519   Vio *vio;
1520   DBUG_ENTER("create_thd");
1521 
1522   DBUG_EXECUTE_IF("simulate_failed_connection_2", DBUG_RETURN(0); );
1523 
1524   if (thd)
1525   {
1526     /* reuse old thd */
1527     thd->reset_for_reuse();
1528     /*
1529       reset tread_id's, but not thread_dbug_id's as the later isn't allowed
1530       to change as there is already structures in thd marked with the old
1531       value.
1532     */
1533     thd->thread_id= thd->variables.pseudo_thread_id= thread_id;
1534   }
1535   else if (!(thd= new THD(thread_id)))
1536     DBUG_RETURN(0);
1537 
1538 #if _WIN32
1539   if (vio_type == VIO_TYPE_NAMEDPIPE)
1540     vio= vio_new_win32pipe(pipe);
1541   else
1542 #endif
1543   vio= mysql_socket_vio_new(sock, vio_type, vio_type == VIO_TYPE_SOCKET ?
1544                                                         VIO_LOCALHOST : 0);
1545   if (!vio)
1546   {
1547     if (!thd_reused)
1548       delete thd;
1549     DBUG_RETURN(0);
1550   }
1551 
1552   set_current_thd(thd);
1553   res= my_net_init(&thd->net, vio, thd, MYF(MY_THREAD_SPECIFIC));
1554   vio_type= VIO_CLOSED;                // Vio now handled by thd
1555 
1556   if (unlikely(res || thd->is_error()))
1557   {
1558     if (!thd_reused)
1559       delete thd;
1560     set_current_thd(0);
1561     DBUG_RETURN(0);
1562   }
1563 
1564   init_net_server_extension(thd);
1565 
1566   thd->security_ctx->host= thd->net.vio->type == VIO_TYPE_NAMEDPIPE ||
1567                            thd->net.vio->type == VIO_TYPE_SOCKET ?
1568                            my_localhost : 0;
1569 
1570   thd->scheduler=          scheduler;
1571   thd->real_id= pthread_self(); /* Duplicates THD::store_globals() setting. */
1572 
1573   /* Attach PSI instrumentation to the new THD */
1574 
1575   PSI_thread *psi= PSI_CALL_get_thread();
1576   PSI_CALL_set_thread_os_id(psi);
1577   PSI_CALL_set_thread_THD(psi, thd);
1578   PSI_CALL_set_thread_id(psi, thd->thread_id);
1579   thd->set_psi(psi);
1580 
1581   DBUG_RETURN(thd);
1582 }
1583