1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "event_scheduler.h"
24 
25 #include "events.h"
26 #include "event_data_objects.h"
27 #include "event_queue.h"
28 #include "event_db_repository.h"
29 #include "auth_common.h"             // SUPER_ACL
30 #include "log.h"
31 #include "mysqld_thd_manager.h"      // Global_THD_manager
32 #include "sql_error.h"               // Sql_condition
33 #include "sql_class.h"               // THD
34 
35 /**
36   @addtogroup Event_Scheduler
37   @{
38 */
39 
40 #define LOCK_DATA()       lock_data(__func__, __LINE__)
41 #define UNLOCK_DATA()     unlock_data(__func__, __LINE__)
42 #define COND_STATE_WAIT(mythd, abstime, stage) \
43         cond_wait(mythd, abstime, stage, __func__, __FILE__, __LINE__)
44 
45 extern my_thread_attr_t connection_attrib;
46 
47 
48 Event_db_repository *Event_worker_thread::db_repository;
49 
50 
51 static
52 const LEX_STRING scheduler_states_names[] =
53 {
54   { C_STRING_WITH_LEN("INITIALIZED") },
55   { C_STRING_WITH_LEN("RUNNING") },
56   { C_STRING_WITH_LEN("STOPPING") }
57 };
58 
59 struct scheduler_param {
60   THD *thd;
61   Event_scheduler *scheduler;
62 };
63 
64 
65 /*
66   Prints the stack of infos, warnings, errors from thd to
67   the console so it can be fetched by the logs-into-tables and
68   checked later.
69 
70   SYNOPSIS
71     evex_print_warnings
72       thd  Thread used during the execution of the event
73       et   The event itself
74 */
75 
76 void
print_warnings(THD * thd,Event_job_data * et)77 Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
78 {
79   const Sql_condition *err;
80   DBUG_ENTER("evex_print_warnings");
81   if (thd->get_stmt_da()->cond_count() == 0)
82     DBUG_VOID_RETURN;
83 
84   char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
85   char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
86   String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
87   prefix.length(0);
88   prefix.append("Event Scheduler: [");
89 
90   prefix.append(et->definer.str, et->definer.length, system_charset_info);
91   prefix.append("][", 2);
92   prefix.append(et->dbname.str, et->dbname.length, system_charset_info);
93   prefix.append('.');
94   prefix.append(et->name.str, et->name.length, system_charset_info);
95   prefix.append("] ", 2);
96 
97   Diagnostics_area::Sql_condition_iterator it=
98     thd->get_stmt_da()->sql_conditions();
99   while ((err= it++))
100   {
101     String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
102     /* set it to 0 or we start adding at the end. That's the trick ;) */
103     err_msg.length(0);
104     err_msg.append(prefix);
105     err_msg.append(err->message_text(),
106                    err->message_octet_length(), system_charset_info);
107     switch (err->severity())
108     {
109     case Sql_condition::SL_ERROR:
110       sql_print_error("%*s", static_cast<int>(err_msg.length()), err_msg.c_ptr());
111       break;
112     case Sql_condition::SL_WARNING:
113       sql_print_warning("%*s", static_cast<int>(err_msg.length()), err_msg.c_ptr());
114       break;
115     case Sql_condition::SL_NOTE:
116       sql_print_information("%*s", static_cast<int>(err_msg.length()), err_msg.c_ptr());
117       break;
118     default:
119       assert(false);
120     }
121   }
122   DBUG_VOID_RETURN;
123 }
124 
125 
126 /*
127   Performs post initialization of structures in a new thread.
128 
129   SYNOPSIS
130     post_init_event_thread()
131       thd  Thread
132 
133   NOTES
134       Before this is called, one should not do any DBUG_XXX() calls.
135 
136 */
137 
138 bool
post_init_event_thread(THD * thd)139 post_init_event_thread(THD *thd)
140 {
141   if (my_thread_init() || thd->store_globals())
142   {
143     return TRUE;
144   }
145 
146   Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
147   thd_manager->add_thd(thd);
148   thd_manager->inc_thread_running();
149   return FALSE;
150 }
151 
152 
153 /*
154   Cleans up the THD and the threaded environment of the thread.
155 
156   SYNOPSIS
157     deinit_event_thread()
158       thd  Thread
159 */
160 
161 void
deinit_event_thread(THD * thd)162 deinit_event_thread(THD *thd)
163 {
164   Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
165   thd->proc_info= "Clearing";
166   thd->get_protocol_classic()->end_net();
167   DBUG_PRINT("exit", ("Event thread finishing"));
168 
169   thd->release_resources();
170   thd_manager->remove_thd(thd);
171   thd_manager->dec_thread_running();
172   delete thd;
173 }
174 
175 
176 /*
177   Performs pre- mysql_thread_create() initialisation of THD. Do this
178   in the thread that will pass THD to the child thread. In the
179   child thread call post_init_event_thread().
180 
181   SYNOPSIS
182     pre_init_event_thread()
183       thd  The THD of the thread. Has to be allocated by the caller.
184 
185   NOTES
186     1. The host of the thead is my_localhost
187     2. thd->net is initted with NULL - no communication.
188 */
189 
190 void
pre_init_event_thread(THD * thd)191 pre_init_event_thread(THD* thd)
192 {
193   DBUG_ENTER("pre_init_event_thread");
194   thd->security_context()->set_master_access(0);
195   thd->security_context()->set_db_access(0);
196   thd->security_context()->set_host_or_ip_ptr((char *) my_localhost,
197                                               strlen(my_localhost));
198   thd->get_protocol_classic()->init_net(NULL);
199   thd->security_context()->set_user_ptr(C_STRING_WITH_LEN("event_scheduler"));
200   thd->get_protocol_classic()->get_net()->read_timeout= slave_net_timeout;
201   thd->slave_thread= 0;
202   thd->variables.option_bits|= OPTION_AUTO_IS_NULL;
203   thd->get_protocol_classic()->set_client_capabilities(CLIENT_MULTI_RESULTS);
204 
205   thd->set_new_thread_id();
206   /*
207     Guarantees that we will see the thread in SHOW PROCESSLIST though its
208     vio is NULL.
209   */
210 
211   thd->proc_info= "Initialized";
212   thd->set_time();
213 
214   /* Do not use user-supplied timeout value for system threads. */
215   thd->variables.lock_wait_timeout= LONG_TIMEOUT;
216 
217   DBUG_VOID_RETURN;
218 }
219 
220 
221 /*
222   Function that executes the scheduler,
223 
224   SYNOPSIS
225     event_scheduler_thread()
226       arg  Pointer to `struct scheduler_param`
227 
228   RETURN VALUE
229     0  OK
230 */
231 
event_scheduler_thread(void * arg)232 extern "C" void *event_scheduler_thread(void *arg)
233 {
234   /* needs to be first for thread_stack */
235   THD *thd= ((struct scheduler_param *) arg)->thd;
236   Event_scheduler *scheduler= ((struct scheduler_param *) arg)->scheduler;
237   bool res;
238 
239   thd->thread_stack= (char *)&thd;              // remember where our stack is
240 
241   mysql_thread_set_psi_id(thd->thread_id());
242 
243   res= post_init_event_thread(thd);
244 
245   DBUG_ENTER("event_scheduler_thread");
246   my_claim(arg);
247   thd->claim_memory_ownership();
248   my_free(arg);
249   if (!res)
250     scheduler->run(thd);
251   else
252   {
253     thd->proc_info= "Clearing";
254     thd->get_protocol_classic()->end_net();
255     delete thd;
256   }
257 
258   DBUG_LEAVE;                               // Against gcc warnings
259   my_thread_end();
260   return 0;
261 }
262 
263 
264 /**
265   Function that executes an event in a child thread. Setups the
266   environment for the event execution and cleans after that.
267 
268   SYNOPSIS
269     event_worker_thread()
270       arg  The Event_job_data object to be processed
271 
272   RETURN VALUE
273     0  OK
274 */
275 
event_worker_thread(void * arg)276 extern "C" void *event_worker_thread(void *arg)
277 {
278   THD *thd;
279   Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
280 
281   event->claim_memory_ownership();
282 
283   thd= event->thd;
284 
285   thd->claim_memory_ownership();
286 
287   mysql_thread_set_psi_id(thd->thread_id());
288 
289   Event_worker_thread worker_thread;
290   worker_thread.run(thd, event);
291 
292   my_thread_end();
293   return 0;                                     // Can't return anything here
294 }
295 
296 
297 /**
298   Function that executes an event in a child thread. Setups the
299   environment for the event execution and cleans after that.
300 
301   SYNOPSIS
302     Event_worker_thread::run()
303       thd    Thread context
304       event  The Event_queue_element_for_exec object to be processed
305 */
306 
307 void
run(THD * thd,Event_queue_element_for_exec * event)308 Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
309 {
310   /* needs to be first for thread_stack */
311   char my_stack;
312   Event_job_data job_data;
313   bool res;
314 
315   assert(thd->m_digest == NULL);
316 
317 #ifdef HAVE_PSI_STATEMENT_INTERFACE
318   PSI_statement_locker_state state;
319   assert(thd->m_statement_psi == NULL);
320   thd->m_statement_psi= MYSQL_START_STATEMENT(& state,
321                                               event->get_psi_info()->m_key,
322                                               event->dbname.str,
323                                               event->dbname.length,
324                                               thd->charset(), NULL);
325 #endif
326 
327   thd->thread_stack= &my_stack;                // remember where our stack is
328   res= post_init_event_thread(thd);
329 
330   DBUG_ENTER("Event_worker_thread::run");
331   DBUG_PRINT("info", ("Time is %ld, THD: 0x%lx", (long) my_time(0), (long) thd));
332 
333   if (res)
334     goto end;
335 
336   if ((res= db_repository->load_named_event(thd, event->dbname, event->name,
337                                             &job_data)))
338   {
339     DBUG_PRINT("error", ("Got error from load_named_event"));
340     goto end;
341   }
342 
343   thd->enable_slow_log= TRUE;
344 
345   res= job_data.execute(thd, event->dropped);
346 
347   print_warnings(thd, &job_data);
348 
349   if (res)
350     sql_print_information("Event Scheduler: "
351                           "[%s].[%s.%s] event execution failed.",
352                           job_data.definer.str,
353                           job_data.dbname.str, job_data.name.str);
354 end:
355 #ifdef HAVE_PSI_STATEMENT_INTERFACE
356   MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
357   thd->m_statement_psi= NULL;
358 #endif
359 
360   assert(thd->m_digest == NULL);
361 
362   DBUG_PRINT("info", ("Done with Event %s.%s", event->dbname.str,
363              event->name.str));
364 
365   delete event;
366   deinit_event_thread(thd);
367 
368   DBUG_VOID_RETURN;
369 }
370 
371 
Event_scheduler(Event_queue * queue_arg)372 Event_scheduler::Event_scheduler(Event_queue *queue_arg)
373   :state(INITIALIZED),
374   scheduler_thd(NULL),
375   queue(queue_arg),
376   mutex_last_locked_at_line(0),
377   mutex_last_unlocked_at_line(0),
378   mutex_last_locked_in_func("n/a"),
379   mutex_last_unlocked_in_func("n/a"),
380   mutex_scheduler_data_locked(FALSE),
381   waiting_on_cond(FALSE),
382   started_events(0)
383 {
384   mysql_mutex_init(key_event_scheduler_LOCK_scheduler_state,
385                    &LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
386   mysql_cond_init(key_event_scheduler_COND_state, &COND_state);
387 }
388 
389 
~Event_scheduler()390 Event_scheduler::~Event_scheduler()
391 {
392   stop();                                    /* does nothing if not running */
393   mysql_mutex_destroy(&LOCK_scheduler_state);
394   mysql_cond_destroy(&COND_state);
395 }
396 
397 
398 
399 /**
400   Starts the scheduler (again). Creates a new THD and passes it to
401   a forked thread. Does not wait for acknowledgement from the new
402   thread that it has started. Asynchronous starting. Most of the
403   needed initializations are done in the current thread to minimize
404   the chance of failure in the spawned thread.
405 
406   @param[out] err_no - errno indicating type of error which caused
407                        failure to start scheduler thread.
408 
409   @return
410     @retval false Success.
411     @retval true  Error.
412 */
413 
414 bool
start(int * err_no)415 Event_scheduler::start(int *err_no)
416 {
417   THD *new_thd= NULL;
418   bool ret= false;
419   my_thread_handle th;
420   struct scheduler_param *scheduler_param_value;
421   ulong master_access;
422   DBUG_ENTER("Event_scheduler::start");
423 
424   LOCK_DATA();
425   DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
426   if (state > INITIALIZED)
427     goto end;
428 
429   DBUG_EXECUTE_IF("event_scheduler_thread_create_failure", {
430                   *err_no= 11;
431                   Events::opt_event_scheduler= Events::EVENTS_OFF;
432                   ret= true;
433                   goto end; });
434 
435   if (!(new_thd= new THD))
436   {
437     sql_print_error("Event Scheduler: Cannot initialize the scheduler thread");
438     ret= true;
439     goto end;
440   }
441   pre_init_event_thread(new_thd);
442   new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
443   new_thd->set_command(COM_DAEMON);
444 
445   /*
446     We should run the event scheduler thread under the super-user privileges.
447     In particular, this is needed to be able to lock the mysql.event table
448     for writing when the server is running in the read-only mode.
449 
450     Same goes for transaction access mode. Set it to read-write for this thd.
451   */
452   master_access= new_thd->security_context()->master_access();
453   new_thd->security_context()->set_master_access(master_access | SUPER_ACL);
454   new_thd->variables.tx_read_only=
455            new_thd->variables.transaction_read_only= false;
456   new_thd->tx_read_only= false;
457 
458   scheduler_param_value=
459     (struct scheduler_param *)my_malloc(key_memory_Event_scheduler_scheduler_param,
460                                         sizeof(struct scheduler_param), MYF(0));
461   scheduler_param_value->thd= new_thd;
462   scheduler_param_value->scheduler= this;
463 
464   scheduler_thd= new_thd;
465   DBUG_PRINT("info", ("Setting state go RUNNING"));
466   state= RUNNING;
467   DBUG_PRINT("info", ("Forking new thread for scheduler. THD: 0x%lx", (long) new_thd));
468   if ((*err_no= mysql_thread_create(key_thread_event_scheduler,
469                                     &th, &connection_attrib,
470                                     event_scheduler_thread,
471                                     (void*)scheduler_param_value)))
472   {
473     DBUG_PRINT("error", ("cannot create a new thread"));
474     sql_print_error("Event scheduler: Failed to start scheduler,"
475                     " Can not create thread for event scheduler (errno=%d)",
476                     *err_no);
477 
478     new_thd->proc_info= "Clearing";
479     new_thd->get_protocol_classic()->end_net();
480 
481     state= INITIALIZED;
482     scheduler_thd= NULL;
483     delete new_thd;
484 
485     delete scheduler_param_value;
486     ret= true;
487   }
488 
489 end:
490   UNLOCK_DATA();
491   DBUG_RETURN(ret);
492 }
493 
494 
495 /*
496   The main loop of the scheduler.
497 
498   SYNOPSIS
499     Event_scheduler::run()
500       thd  Thread
501 
502   RETURN VALUE
503     FALSE  OK
504     TRUE   Error (Serious error)
505 */
506 
507 bool
run(THD * thd)508 Event_scheduler::run(THD *thd)
509 {
510   int res= FALSE;
511   DBUG_ENTER("Event_scheduler::run");
512 
513   sql_print_information("Event Scheduler: scheduler thread started with id %u",
514                         thd->thread_id());
515   /*
516     Recalculate the values in the queue because there could have been stops
517     in executions of the scheduler and some times could have passed by.
518   */
519   queue->recalculate_activation_times(thd);
520 
521   while (is_running())
522   {
523     Event_queue_element_for_exec *event_name;
524 
525     /* Gets a minimized version */
526     if (queue->get_top_for_execution_if_time(thd, &event_name))
527     {
528       sql_print_information("Event Scheduler: "
529                             "Serious error during getting next "
530                             "event to execute. Stopping");
531       break;
532     }
533 
534     DBUG_PRINT("info", ("get_top_for_execution_if_time returned "
535                         "event_name=0x%lx", (long) event_name));
536     if (event_name)
537     {
538       if ((res= execute_top(event_name)))
539         break;
540     }
541     else
542     {
543       assert(thd->killed);
544       DBUG_PRINT("info", ("job_data is NULL, the thread was killed"));
545     }
546     DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
547     free_root(thd->mem_root, MYF(0));
548   }
549 
550   LOCK_DATA();
551   deinit_event_thread(thd);
552   scheduler_thd= NULL;
553   state= INITIALIZED;
554   DBUG_PRINT("info", ("Broadcasting COND_state back to the stoppers"));
555   mysql_cond_broadcast(&COND_state);
556   UNLOCK_DATA();
557 
558   DBUG_RETURN(res);
559 }
560 
561 
562 /*
563   Creates a new THD instance and then forks a new thread, while passing
564   the THD pointer and job_data to it.
565 
566   SYNOPSIS
567     Event_scheduler::execute_top()
568 
569   RETURN VALUE
570     FALSE  OK
571     TRUE   Error (Serious error)
572 */
573 
574 bool
execute_top(Event_queue_element_for_exec * event_name)575 Event_scheduler::execute_top(Event_queue_element_for_exec *event_name)
576 {
577   THD *new_thd;
578   my_thread_handle th;
579   int res= 0;
580   DBUG_ENTER("Event_scheduler::execute_top");
581   if (!(new_thd= new THD()))
582     goto error;
583 
584   pre_init_event_thread(new_thd);
585   new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
586   event_name->thd= new_thd;
587   DBUG_PRINT("info", ("Event %s@%s ready for start",
588              event_name->dbname.str, event_name->name.str));
589 
590   /*
591     TODO: should use thread pool here, preferably with an upper limit
592     on number of threads: if too many events are scheduled for the
593     same time, starting all of them at once won't help them run truly
594     in parallel (because of the great amount of synchronization), so
595     we may as well execute them in sequence, keeping concurrency at a
596     reasonable level.
597   */
598   /* Major failure */
599   if ((res= mysql_thread_create(key_thread_event_worker,
600                                 &th, &connection_attrib, event_worker_thread,
601                                 event_name)))
602   {
603     mysql_mutex_lock(&LOCK_global_system_variables);
604     Events::opt_event_scheduler= Events::EVENTS_OFF;
605     mysql_mutex_unlock(&LOCK_global_system_variables);
606 
607     sql_print_error("Event_scheduler::execute_top: Can not create event worker"
608                     " thread (errno=%d). Stopping event scheduler", res);
609 
610     new_thd->proc_info= "Clearing";
611     new_thd->get_protocol_classic()->end_net();
612 
613     goto error;
614   }
615 
616   ++started_events;
617 
618   DBUG_PRINT("info", ("Event is in THD: 0x%lx", (long) new_thd));
619   DBUG_RETURN(FALSE);
620 
621 error:
622   DBUG_PRINT("error", ("Event_scheduler::execute_top() res: %d", res));
623   if (new_thd)
624     delete new_thd;
625 
626   delete event_name;
627   DBUG_RETURN(TRUE);
628 }
629 
630 
631 /*
632   Checks whether the state of the scheduler is RUNNING
633 
634   SYNOPSIS
635     Event_scheduler::is_running()
636 
637   RETURN VALUE
638     TRUE   RUNNING
639     FALSE  Not RUNNING
640 */
641 
642 bool
is_running()643 Event_scheduler::is_running()
644 {
645   LOCK_DATA();
646   bool ret= (state == RUNNING);
647   UNLOCK_DATA();
648   return ret;
649 }
650 
651 
652 /**
653   Stops the scheduler (again). Waits for acknowledgement from the
654   scheduler that it has stopped - synchronous stopping.
655 
656   Already running events will not be stopped. If the user needs
657   them stopped manual intervention is needed.
658 
659   SYNOPSIS
660     Event_scheduler::stop()
661 
662   RETURN VALUE
663     FALSE  OK
664     TRUE   Error (not reported)
665 */
666 
667 bool
stop()668 Event_scheduler::stop()
669 {
670   THD *thd= current_thd;
671   DBUG_ENTER("Event_scheduler::stop");
672   DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
673 
674   LOCK_DATA();
675   DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
676   if (state != RUNNING)
677   {
678     /* Synchronously wait until the scheduler stops. */
679     while (state != INITIALIZED)
680       COND_STATE_WAIT(thd, NULL, &stage_waiting_for_scheduler_to_stop);
681     goto end;
682   }
683 
684   /* Guarantee we don't catch spurious signals */
685   do {
686     DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from "
687                         "the scheduler thread.  Current value of state is %s . "
688                         "workers count=%d", scheduler_states_names[state].str,
689                         workers_count()));
690     /*
691       NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON
692       threads. In addition, kill_one_thread() requires THD but during shutdown
693       current_thd is NULL. Hence, if kill_one_thread should be used it has to
694       be modified to kill also daemons, by adding a flag, and also we have to
695       create artificial THD here. To save all this work, we just do what
696       kill_one_thread() does to kill a thread. See also sql_repl.cc for similar
697       usage.
698     */
699 
700     state= STOPPING;
701     DBUG_PRINT("info", ("Scheduler thread has id %u",
702                         scheduler_thd->thread_id()));
703     /* Lock from delete */
704     mysql_mutex_lock(&scheduler_thd->LOCK_thd_data);
705     /* This will wake up the thread if it waits on Queue's conditional */
706     sql_print_information("Event Scheduler: Killing the scheduler thread, "
707                           "thread id %u",
708                           scheduler_thd->thread_id());
709     scheduler_thd->awake(THD::KILL_CONNECTION);
710     mysql_mutex_unlock(&scheduler_thd->LOCK_thd_data);
711 
712     /* thd could be 0x0, when shutting down */
713     sql_print_information("Event Scheduler: "
714                           "Waiting for the scheduler thread to reply");
715     COND_STATE_WAIT(thd, NULL, &stage_waiting_for_scheduler_to_stop);
716   } while (state == STOPPING);
717   DBUG_PRINT("info", ("Scheduler thread has cleaned up. Set state to INIT"));
718   sql_print_information("Event Scheduler: Stopped");
719 end:
720   UNLOCK_DATA();
721   DBUG_RETURN(FALSE);
722 }
723 
724 /**
725   This class implements callback for do_for_all_thd().
726   It counts the total number of living event worker threads
727   from global thread list.
728 */
729 
730 class Is_worker : public Do_THD_Impl
731 {
732 public:
Is_worker()733   Is_worker() : m_count(0) {}
operator ()(THD * thd)734   virtual void operator()(THD *thd)
735   {
736     if (thd->system_thread == SYSTEM_THREAD_EVENT_WORKER)
737       m_count++;
738     return;
739   }
get_count()740   int get_count() { return m_count; }
741 private:
742   int m_count;
743 };
744 
745 /*
746   Returns the number of living event worker threads.
747 
748   SYNOPSIS
749     Event_scheduler::workers_count()
750 */
751 
752 int
workers_count()753 Event_scheduler::workers_count()
754 {
755   int count= 0;
756   Is_worker is_worker;
757   DBUG_ENTER("Event_scheduler::workers_count");
758   Global_THD_manager::get_instance()->do_for_all_thd(&is_worker);
759   count= is_worker.get_count();
760   DBUG_PRINT("exit", ("%d", count));
761   DBUG_RETURN(count);
762 }
763 
764 
765 /*
766   Auxiliary function for locking LOCK_scheduler_state. Used
767   by the LOCK_DATA macro.
768 
769   SYNOPSIS
770     Event_scheduler::lock_data()
771       func  Which function is requesting mutex lock
772       line  On which line mutex lock is requested
773 */
774 
775 void
lock_data(const char * func,uint line)776 Event_scheduler::lock_data(const char *func, uint line)
777 {
778   DBUG_ENTER("Event_scheduler::lock_data");
779   DBUG_PRINT("enter", ("func=%s line=%u", func, line));
780   mysql_mutex_lock(&LOCK_scheduler_state);
781   mutex_last_locked_in_func= func;
782   mutex_last_locked_at_line= line;
783   mutex_scheduler_data_locked= TRUE;
784   DBUG_VOID_RETURN;
785 }
786 
787 
788 /*
789   Auxiliary function for unlocking LOCK_scheduler_state. Used
790   by the UNLOCK_DATA macro.
791 
792   SYNOPSIS
793     Event_scheduler::unlock_data()
794       func  Which function is requesting mutex unlock
795       line  On which line mutex unlock is requested
796 */
797 
798 void
unlock_data(const char * func,uint line)799 Event_scheduler::unlock_data(const char *func, uint line)
800 {
801   DBUG_ENTER("Event_scheduler::unlock_data");
802   DBUG_PRINT("enter", ("func=%s line=%u", func, line));
803   mutex_last_unlocked_at_line= line;
804   mutex_scheduler_data_locked= FALSE;
805   mutex_last_unlocked_in_func= func;
806   mysql_mutex_unlock(&LOCK_scheduler_state);
807   DBUG_VOID_RETURN;
808 }
809 
810 
811 /*
812   Wrapper for mysql_cond_wait/timedwait
813 
814   SYNOPSIS
815     Event_scheduler::cond_wait()
816       thd     Thread (Could be NULL during shutdown procedure)
817       abstime If not null then call mysql_cond_timedwait()
818       msg     Message for thd->proc_info
819       func    Which function is requesting cond_wait
820       line    On which line cond_wait is requested
821 */
822 
823 void
cond_wait(THD * thd,struct timespec * abstime,const PSI_stage_info * stage,const char * src_func,const char * src_file,uint src_line)824 Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const PSI_stage_info *stage,
825                            const char *src_func, const char *src_file, uint src_line)
826 {
827   DBUG_ENTER("Event_scheduler::cond_wait");
828   waiting_on_cond= TRUE;
829   mutex_last_unlocked_at_line= src_line;
830   mutex_scheduler_data_locked= FALSE;
831   mutex_last_unlocked_in_func= src_func;
832   if (thd)
833     thd->enter_cond(&COND_state, &LOCK_scheduler_state, stage,
834                     NULL, src_func, src_file, src_line);
835 
836   DBUG_PRINT("info", ("mysql_cond_%swait", abstime? "timed":""));
837   if (!abstime)
838     mysql_cond_wait(&COND_state, &LOCK_scheduler_state);
839   else
840     mysql_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
841   if (thd)
842   {
843     /*
844       Need to unlock before exit_cond, so we need to relock.
845       Not the best thing to do but we need to obey cond_wait()
846     */
847     UNLOCK_DATA();
848     thd->exit_cond(NULL, src_func, src_file, src_line);
849     LOCK_DATA();
850   }
851   mutex_last_locked_in_func= src_func;
852   mutex_last_locked_at_line= src_line;
853   mutex_scheduler_data_locked= TRUE;
854   waiting_on_cond= FALSE;
855   DBUG_VOID_RETURN;
856 }
857 
858 
859 /*
860   Dumps the internal status of the scheduler
861 
862   SYNOPSIS
863     Event_scheduler::dump_internal_status()
864 */
865 
866 void
dump_internal_status()867 Event_scheduler::dump_internal_status()
868 {
869   DBUG_ENTER("Event_scheduler::dump_internal_status");
870 
871   puts("");
872   puts("Event scheduler status:");
873   printf("State      : %s\n", scheduler_states_names[state].str);
874   printf("Thread id  : %u\n", scheduler_thd? scheduler_thd->thread_id() : 0);
875   printf("LLA        : %s:%u\n", mutex_last_locked_in_func,
876                                  mutex_last_locked_at_line);
877   printf("LUA        : %s:%u\n", mutex_last_unlocked_in_func,
878                                  mutex_last_unlocked_at_line);
879   printf("WOC        : %s\n", waiting_on_cond? "YES":"NO");
880   printf("Workers    : %d\n", workers_count());
881   printf("Executed   : %lu\n", (ulong) started_events);
882   printf("Data locked: %s\n", mutex_scheduler_data_locked ? "YES":"NO");
883 
884   DBUG_VOID_RETURN;
885 }
886 
887 /**
888   @} (End of group Event_Scheduler)
889 */
890