1 /* Copyright (c) 2006, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "event_scheduler.h"
24
25 #include "events.h"
26 #include "event_data_objects.h"
27 #include "event_queue.h"
28 #include "event_db_repository.h"
29 #include "auth_common.h" // SUPER_ACL
30 #include "log.h"
31 #include "mysqld_thd_manager.h" // Global_THD_manager
32 #include "sql_error.h" // Sql_condition
33 #include "sql_class.h" // THD
34
35 /**
36 @addtogroup Event_Scheduler
37 @{
38 */
39
40 #define LOCK_DATA() lock_data(__func__, __LINE__)
41 #define UNLOCK_DATA() unlock_data(__func__, __LINE__)
42 #define COND_STATE_WAIT(mythd, abstime, stage) \
43 cond_wait(mythd, abstime, stage, __func__, __FILE__, __LINE__)
44
45 extern my_thread_attr_t connection_attrib;
46
47
48 Event_db_repository *Event_worker_thread::db_repository;
49
50
51 static
52 const LEX_STRING scheduler_states_names[] =
53 {
54 { C_STRING_WITH_LEN("INITIALIZED") },
55 { C_STRING_WITH_LEN("RUNNING") },
56 { C_STRING_WITH_LEN("STOPPING") }
57 };
58
59 struct scheduler_param {
60 THD *thd;
61 Event_scheduler *scheduler;
62 };
63
64
65 /*
66 Prints the stack of infos, warnings, errors from thd to
67 the console so it can be fetched by the logs-into-tables and
68 checked later.
69
70 SYNOPSIS
71 evex_print_warnings
72 thd Thread used during the execution of the event
73 et The event itself
74 */
75
76 void
print_warnings(THD * thd,Event_job_data * et)77 Event_worker_thread::print_warnings(THD *thd, Event_job_data *et)
78 {
79 const Sql_condition *err;
80 DBUG_ENTER("evex_print_warnings");
81 if (thd->get_stmt_da()->cond_count() == 0)
82 DBUG_VOID_RETURN;
83
84 char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
85 char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
86 String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
87 prefix.length(0);
88 prefix.append("Event Scheduler: [");
89
90 prefix.append(et->definer.str, et->definer.length, system_charset_info);
91 prefix.append("][", 2);
92 prefix.append(et->dbname.str, et->dbname.length, system_charset_info);
93 prefix.append('.');
94 prefix.append(et->name.str, et->name.length, system_charset_info);
95 prefix.append("] ", 2);
96
97 Diagnostics_area::Sql_condition_iterator it=
98 thd->get_stmt_da()->sql_conditions();
99 while ((err= it++))
100 {
101 String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
102 /* set it to 0 or we start adding at the end. That's the trick ;) */
103 err_msg.length(0);
104 err_msg.append(prefix);
105 err_msg.append(err->message_text(),
106 err->message_octet_length(), system_charset_info);
107 switch (err->severity())
108 {
109 case Sql_condition::SL_ERROR:
110 sql_print_error("%*s", static_cast<int>(err_msg.length()), err_msg.c_ptr());
111 break;
112 case Sql_condition::SL_WARNING:
113 sql_print_warning("%*s", static_cast<int>(err_msg.length()), err_msg.c_ptr());
114 break;
115 case Sql_condition::SL_NOTE:
116 sql_print_information("%*s", static_cast<int>(err_msg.length()), err_msg.c_ptr());
117 break;
118 default:
119 assert(false);
120 }
121 }
122 DBUG_VOID_RETURN;
123 }
124
125
126 /*
127 Performs post initialization of structures in a new thread.
128
129 SYNOPSIS
130 post_init_event_thread()
131 thd Thread
132
133 NOTES
134 Before this is called, one should not do any DBUG_XXX() calls.
135
136 */
137
138 bool
post_init_event_thread(THD * thd)139 post_init_event_thread(THD *thd)
140 {
141 if (my_thread_init() || thd->store_globals())
142 {
143 return TRUE;
144 }
145
146 Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
147 thd_manager->add_thd(thd);
148 thd_manager->inc_thread_running();
149 return FALSE;
150 }
151
152
153 /*
154 Cleans up the THD and the threaded environment of the thread.
155
156 SYNOPSIS
157 deinit_event_thread()
158 thd Thread
159 */
160
161 void
deinit_event_thread(THD * thd)162 deinit_event_thread(THD *thd)
163 {
164 Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
165 thd->proc_info= "Clearing";
166 thd->get_protocol_classic()->end_net();
167 DBUG_PRINT("exit", ("Event thread finishing"));
168
169 thd->release_resources();
170 thd_manager->remove_thd(thd);
171 thd_manager->dec_thread_running();
172 delete thd;
173 }
174
175
176 /*
177 Performs pre- mysql_thread_create() initialisation of THD. Do this
178 in the thread that will pass THD to the child thread. In the
179 child thread call post_init_event_thread().
180
181 SYNOPSIS
182 pre_init_event_thread()
183 thd The THD of the thread. Has to be allocated by the caller.
184
185 NOTES
186 1. The host of the thead is my_localhost
187 2. thd->net is initted with NULL - no communication.
188 */
189
190 void
pre_init_event_thread(THD * thd)191 pre_init_event_thread(THD* thd)
192 {
193 DBUG_ENTER("pre_init_event_thread");
194 thd->security_context()->set_master_access(0);
195 thd->security_context()->set_db_access(0);
196 thd->security_context()->set_host_or_ip_ptr((char *) my_localhost,
197 strlen(my_localhost));
198 thd->get_protocol_classic()->init_net(NULL);
199 thd->security_context()->set_user_ptr(C_STRING_WITH_LEN("event_scheduler"));
200 thd->get_protocol_classic()->get_net()->read_timeout= slave_net_timeout;
201 thd->slave_thread= 0;
202 thd->variables.option_bits|= OPTION_AUTO_IS_NULL;
203 thd->get_protocol_classic()->set_client_capabilities(CLIENT_MULTI_RESULTS);
204
205 thd->set_new_thread_id();
206 /*
207 Guarantees that we will see the thread in SHOW PROCESSLIST though its
208 vio is NULL.
209 */
210
211 thd->proc_info= "Initialized";
212 thd->set_time();
213
214 /* Do not use user-supplied timeout value for system threads. */
215 thd->variables.lock_wait_timeout= LONG_TIMEOUT;
216
217 DBUG_VOID_RETURN;
218 }
219
220
221 /*
222 Function that executes the scheduler,
223
224 SYNOPSIS
225 event_scheduler_thread()
226 arg Pointer to `struct scheduler_param`
227
228 RETURN VALUE
229 0 OK
230 */
231
event_scheduler_thread(void * arg)232 extern "C" void *event_scheduler_thread(void *arg)
233 {
234 /* needs to be first for thread_stack */
235 THD *thd= ((struct scheduler_param *) arg)->thd;
236 Event_scheduler *scheduler= ((struct scheduler_param *) arg)->scheduler;
237 bool res;
238
239 thd->thread_stack= (char *)&thd; // remember where our stack is
240
241 mysql_thread_set_psi_id(thd->thread_id());
242
243 res= post_init_event_thread(thd);
244
245 DBUG_ENTER("event_scheduler_thread");
246 my_claim(arg);
247 thd->claim_memory_ownership();
248 my_free(arg);
249 if (!res)
250 scheduler->run(thd);
251 else
252 {
253 thd->proc_info= "Clearing";
254 thd->get_protocol_classic()->end_net();
255 delete thd;
256 }
257
258 DBUG_LEAVE; // Against gcc warnings
259 my_thread_end();
260 return 0;
261 }
262
263
264 /**
265 Function that executes an event in a child thread. Setups the
266 environment for the event execution and cleans after that.
267
268 SYNOPSIS
269 event_worker_thread()
270 arg The Event_job_data object to be processed
271
272 RETURN VALUE
273 0 OK
274 */
275
event_worker_thread(void * arg)276 extern "C" void *event_worker_thread(void *arg)
277 {
278 THD *thd;
279 Event_queue_element_for_exec *event= (Event_queue_element_for_exec *)arg;
280
281 event->claim_memory_ownership();
282
283 thd= event->thd;
284
285 thd->claim_memory_ownership();
286
287 mysql_thread_set_psi_id(thd->thread_id());
288
289 Event_worker_thread worker_thread;
290 worker_thread.run(thd, event);
291
292 my_thread_end();
293 return 0; // Can't return anything here
294 }
295
296
297 /**
298 Function that executes an event in a child thread. Setups the
299 environment for the event execution and cleans after that.
300
301 SYNOPSIS
302 Event_worker_thread::run()
303 thd Thread context
304 event The Event_queue_element_for_exec object to be processed
305 */
306
307 void
run(THD * thd,Event_queue_element_for_exec * event)308 Event_worker_thread::run(THD *thd, Event_queue_element_for_exec *event)
309 {
310 /* needs to be first for thread_stack */
311 char my_stack;
312 Event_job_data job_data;
313 bool res;
314
315 assert(thd->m_digest == NULL);
316
317 #ifdef HAVE_PSI_STATEMENT_INTERFACE
318 PSI_statement_locker_state state;
319 assert(thd->m_statement_psi == NULL);
320 thd->m_statement_psi= MYSQL_START_STATEMENT(& state,
321 event->get_psi_info()->m_key,
322 event->dbname.str,
323 event->dbname.length,
324 thd->charset(), NULL);
325 #endif
326
327 thd->thread_stack= &my_stack; // remember where our stack is
328 res= post_init_event_thread(thd);
329
330 DBUG_ENTER("Event_worker_thread::run");
331 DBUG_PRINT("info", ("Time is %ld, THD: 0x%lx", (long) my_time(0), (long) thd));
332
333 if (res)
334 goto end;
335
336 if ((res= db_repository->load_named_event(thd, event->dbname, event->name,
337 &job_data)))
338 {
339 DBUG_PRINT("error", ("Got error from load_named_event"));
340 goto end;
341 }
342
343 thd->enable_slow_log= TRUE;
344
345 res= job_data.execute(thd, event->dropped);
346
347 print_warnings(thd, &job_data);
348
349 if (res)
350 sql_print_information("Event Scheduler: "
351 "[%s].[%s.%s] event execution failed.",
352 job_data.definer.str,
353 job_data.dbname.str, job_data.name.str);
354 end:
355 #ifdef HAVE_PSI_STATEMENT_INTERFACE
356 MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
357 thd->m_statement_psi= NULL;
358 #endif
359
360 assert(thd->m_digest == NULL);
361
362 DBUG_PRINT("info", ("Done with Event %s.%s", event->dbname.str,
363 event->name.str));
364
365 delete event;
366 deinit_event_thread(thd);
367
368 DBUG_VOID_RETURN;
369 }
370
371
Event_scheduler(Event_queue * queue_arg)372 Event_scheduler::Event_scheduler(Event_queue *queue_arg)
373 :state(INITIALIZED),
374 scheduler_thd(NULL),
375 queue(queue_arg),
376 mutex_last_locked_at_line(0),
377 mutex_last_unlocked_at_line(0),
378 mutex_last_locked_in_func("n/a"),
379 mutex_last_unlocked_in_func("n/a"),
380 mutex_scheduler_data_locked(FALSE),
381 waiting_on_cond(FALSE),
382 started_events(0)
383 {
384 mysql_mutex_init(key_event_scheduler_LOCK_scheduler_state,
385 &LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
386 mysql_cond_init(key_event_scheduler_COND_state, &COND_state);
387 }
388
389
~Event_scheduler()390 Event_scheduler::~Event_scheduler()
391 {
392 stop(); /* does nothing if not running */
393 mysql_mutex_destroy(&LOCK_scheduler_state);
394 mysql_cond_destroy(&COND_state);
395 }
396
397
398
399 /**
400 Starts the scheduler (again). Creates a new THD and passes it to
401 a forked thread. Does not wait for acknowledgement from the new
402 thread that it has started. Asynchronous starting. Most of the
403 needed initializations are done in the current thread to minimize
404 the chance of failure in the spawned thread.
405
406 @param[out] err_no - errno indicating type of error which caused
407 failure to start scheduler thread.
408
409 @return
410 @retval false Success.
411 @retval true Error.
412 */
413
414 bool
start(int * err_no)415 Event_scheduler::start(int *err_no)
416 {
417 THD *new_thd= NULL;
418 bool ret= false;
419 my_thread_handle th;
420 struct scheduler_param *scheduler_param_value;
421 ulong master_access;
422 DBUG_ENTER("Event_scheduler::start");
423
424 LOCK_DATA();
425 DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
426 if (state > INITIALIZED)
427 goto end;
428
429 DBUG_EXECUTE_IF("event_scheduler_thread_create_failure", {
430 *err_no= 11;
431 Events::opt_event_scheduler= Events::EVENTS_OFF;
432 ret= true;
433 goto end; });
434
435 if (!(new_thd= new THD))
436 {
437 sql_print_error("Event Scheduler: Cannot initialize the scheduler thread");
438 ret= true;
439 goto end;
440 }
441 pre_init_event_thread(new_thd);
442 new_thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
443 new_thd->set_command(COM_DAEMON);
444
445 /*
446 We should run the event scheduler thread under the super-user privileges.
447 In particular, this is needed to be able to lock the mysql.event table
448 for writing when the server is running in the read-only mode.
449
450 Same goes for transaction access mode. Set it to read-write for this thd.
451 */
452 master_access= new_thd->security_context()->master_access();
453 new_thd->security_context()->set_master_access(master_access | SUPER_ACL);
454 new_thd->variables.tx_read_only=
455 new_thd->variables.transaction_read_only= false;
456 new_thd->tx_read_only= false;
457
458 scheduler_param_value=
459 (struct scheduler_param *)my_malloc(key_memory_Event_scheduler_scheduler_param,
460 sizeof(struct scheduler_param), MYF(0));
461 scheduler_param_value->thd= new_thd;
462 scheduler_param_value->scheduler= this;
463
464 scheduler_thd= new_thd;
465 DBUG_PRINT("info", ("Setting state go RUNNING"));
466 state= RUNNING;
467 DBUG_PRINT("info", ("Forking new thread for scheduler. THD: 0x%lx", (long) new_thd));
468 if ((*err_no= mysql_thread_create(key_thread_event_scheduler,
469 &th, &connection_attrib,
470 event_scheduler_thread,
471 (void*)scheduler_param_value)))
472 {
473 DBUG_PRINT("error", ("cannot create a new thread"));
474 sql_print_error("Event scheduler: Failed to start scheduler,"
475 " Can not create thread for event scheduler (errno=%d)",
476 *err_no);
477
478 new_thd->proc_info= "Clearing";
479 new_thd->get_protocol_classic()->end_net();
480
481 state= INITIALIZED;
482 scheduler_thd= NULL;
483 delete new_thd;
484
485 delete scheduler_param_value;
486 ret= true;
487 }
488
489 end:
490 UNLOCK_DATA();
491 DBUG_RETURN(ret);
492 }
493
494
495 /*
496 The main loop of the scheduler.
497
498 SYNOPSIS
499 Event_scheduler::run()
500 thd Thread
501
502 RETURN VALUE
503 FALSE OK
504 TRUE Error (Serious error)
505 */
506
507 bool
run(THD * thd)508 Event_scheduler::run(THD *thd)
509 {
510 int res= FALSE;
511 DBUG_ENTER("Event_scheduler::run");
512
513 sql_print_information("Event Scheduler: scheduler thread started with id %u",
514 thd->thread_id());
515 /*
516 Recalculate the values in the queue because there could have been stops
517 in executions of the scheduler and some times could have passed by.
518 */
519 queue->recalculate_activation_times(thd);
520
521 while (is_running())
522 {
523 Event_queue_element_for_exec *event_name;
524
525 /* Gets a minimized version */
526 if (queue->get_top_for_execution_if_time(thd, &event_name))
527 {
528 sql_print_information("Event Scheduler: "
529 "Serious error during getting next "
530 "event to execute. Stopping");
531 break;
532 }
533
534 DBUG_PRINT("info", ("get_top_for_execution_if_time returned "
535 "event_name=0x%lx", (long) event_name));
536 if (event_name)
537 {
538 if ((res= execute_top(event_name)))
539 break;
540 }
541 else
542 {
543 assert(thd->killed);
544 DBUG_PRINT("info", ("job_data is NULL, the thread was killed"));
545 }
546 DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
547 free_root(thd->mem_root, MYF(0));
548 }
549
550 LOCK_DATA();
551 deinit_event_thread(thd);
552 scheduler_thd= NULL;
553 state= INITIALIZED;
554 DBUG_PRINT("info", ("Broadcasting COND_state back to the stoppers"));
555 mysql_cond_broadcast(&COND_state);
556 UNLOCK_DATA();
557
558 DBUG_RETURN(res);
559 }
560
561
562 /*
563 Creates a new THD instance and then forks a new thread, while passing
564 the THD pointer and job_data to it.
565
566 SYNOPSIS
567 Event_scheduler::execute_top()
568
569 RETURN VALUE
570 FALSE OK
571 TRUE Error (Serious error)
572 */
573
574 bool
execute_top(Event_queue_element_for_exec * event_name)575 Event_scheduler::execute_top(Event_queue_element_for_exec *event_name)
576 {
577 THD *new_thd;
578 my_thread_handle th;
579 int res= 0;
580 DBUG_ENTER("Event_scheduler::execute_top");
581 if (!(new_thd= new THD()))
582 goto error;
583
584 pre_init_event_thread(new_thd);
585 new_thd->system_thread= SYSTEM_THREAD_EVENT_WORKER;
586 event_name->thd= new_thd;
587 DBUG_PRINT("info", ("Event %s@%s ready for start",
588 event_name->dbname.str, event_name->name.str));
589
590 /*
591 TODO: should use thread pool here, preferably with an upper limit
592 on number of threads: if too many events are scheduled for the
593 same time, starting all of them at once won't help them run truly
594 in parallel (because of the great amount of synchronization), so
595 we may as well execute them in sequence, keeping concurrency at a
596 reasonable level.
597 */
598 /* Major failure */
599 if ((res= mysql_thread_create(key_thread_event_worker,
600 &th, &connection_attrib, event_worker_thread,
601 event_name)))
602 {
603 mysql_mutex_lock(&LOCK_global_system_variables);
604 Events::opt_event_scheduler= Events::EVENTS_OFF;
605 mysql_mutex_unlock(&LOCK_global_system_variables);
606
607 sql_print_error("Event_scheduler::execute_top: Can not create event worker"
608 " thread (errno=%d). Stopping event scheduler", res);
609
610 new_thd->proc_info= "Clearing";
611 new_thd->get_protocol_classic()->end_net();
612
613 goto error;
614 }
615
616 ++started_events;
617
618 DBUG_PRINT("info", ("Event is in THD: 0x%lx", (long) new_thd));
619 DBUG_RETURN(FALSE);
620
621 error:
622 DBUG_PRINT("error", ("Event_scheduler::execute_top() res: %d", res));
623 if (new_thd)
624 delete new_thd;
625
626 delete event_name;
627 DBUG_RETURN(TRUE);
628 }
629
630
631 /*
632 Checks whether the state of the scheduler is RUNNING
633
634 SYNOPSIS
635 Event_scheduler::is_running()
636
637 RETURN VALUE
638 TRUE RUNNING
639 FALSE Not RUNNING
640 */
641
642 bool
is_running()643 Event_scheduler::is_running()
644 {
645 LOCK_DATA();
646 bool ret= (state == RUNNING);
647 UNLOCK_DATA();
648 return ret;
649 }
650
651
652 /**
653 Stops the scheduler (again). Waits for acknowledgement from the
654 scheduler that it has stopped - synchronous stopping.
655
656 Already running events will not be stopped. If the user needs
657 them stopped manual intervention is needed.
658
659 SYNOPSIS
660 Event_scheduler::stop()
661
662 RETURN VALUE
663 FALSE OK
664 TRUE Error (not reported)
665 */
666
667 bool
stop()668 Event_scheduler::stop()
669 {
670 THD *thd= current_thd;
671 DBUG_ENTER("Event_scheduler::stop");
672 DBUG_PRINT("enter", ("thd: 0x%lx", (long) thd));
673
674 LOCK_DATA();
675 DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state].str));
676 if (state != RUNNING)
677 {
678 /* Synchronously wait until the scheduler stops. */
679 while (state != INITIALIZED)
680 COND_STATE_WAIT(thd, NULL, &stage_waiting_for_scheduler_to_stop);
681 goto end;
682 }
683
684 /* Guarantee we don't catch spurious signals */
685 do {
686 DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from "
687 "the scheduler thread. Current value of state is %s . "
688 "workers count=%d", scheduler_states_names[state].str,
689 workers_count()));
690 /*
691 NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON
692 threads. In addition, kill_one_thread() requires THD but during shutdown
693 current_thd is NULL. Hence, if kill_one_thread should be used it has to
694 be modified to kill also daemons, by adding a flag, and also we have to
695 create artificial THD here. To save all this work, we just do what
696 kill_one_thread() does to kill a thread. See also sql_repl.cc for similar
697 usage.
698 */
699
700 state= STOPPING;
701 DBUG_PRINT("info", ("Scheduler thread has id %u",
702 scheduler_thd->thread_id()));
703 /* Lock from delete */
704 mysql_mutex_lock(&scheduler_thd->LOCK_thd_data);
705 /* This will wake up the thread if it waits on Queue's conditional */
706 sql_print_information("Event Scheduler: Killing the scheduler thread, "
707 "thread id %u",
708 scheduler_thd->thread_id());
709 scheduler_thd->awake(THD::KILL_CONNECTION);
710 mysql_mutex_unlock(&scheduler_thd->LOCK_thd_data);
711
712 /* thd could be 0x0, when shutting down */
713 sql_print_information("Event Scheduler: "
714 "Waiting for the scheduler thread to reply");
715 COND_STATE_WAIT(thd, NULL, &stage_waiting_for_scheduler_to_stop);
716 } while (state == STOPPING);
717 DBUG_PRINT("info", ("Scheduler thread has cleaned up. Set state to INIT"));
718 sql_print_information("Event Scheduler: Stopped");
719 end:
720 UNLOCK_DATA();
721 DBUG_RETURN(FALSE);
722 }
723
724 /**
725 This class implements callback for do_for_all_thd().
726 It counts the total number of living event worker threads
727 from global thread list.
728 */
729
730 class Is_worker : public Do_THD_Impl
731 {
732 public:
Is_worker()733 Is_worker() : m_count(0) {}
operator ()(THD * thd)734 virtual void operator()(THD *thd)
735 {
736 if (thd->system_thread == SYSTEM_THREAD_EVENT_WORKER)
737 m_count++;
738 return;
739 }
get_count()740 int get_count() { return m_count; }
741 private:
742 int m_count;
743 };
744
745 /*
746 Returns the number of living event worker threads.
747
748 SYNOPSIS
749 Event_scheduler::workers_count()
750 */
751
752 int
workers_count()753 Event_scheduler::workers_count()
754 {
755 int count= 0;
756 Is_worker is_worker;
757 DBUG_ENTER("Event_scheduler::workers_count");
758 Global_THD_manager::get_instance()->do_for_all_thd(&is_worker);
759 count= is_worker.get_count();
760 DBUG_PRINT("exit", ("%d", count));
761 DBUG_RETURN(count);
762 }
763
764
765 /*
766 Auxiliary function for locking LOCK_scheduler_state. Used
767 by the LOCK_DATA macro.
768
769 SYNOPSIS
770 Event_scheduler::lock_data()
771 func Which function is requesting mutex lock
772 line On which line mutex lock is requested
773 */
774
775 void
lock_data(const char * func,uint line)776 Event_scheduler::lock_data(const char *func, uint line)
777 {
778 DBUG_ENTER("Event_scheduler::lock_data");
779 DBUG_PRINT("enter", ("func=%s line=%u", func, line));
780 mysql_mutex_lock(&LOCK_scheduler_state);
781 mutex_last_locked_in_func= func;
782 mutex_last_locked_at_line= line;
783 mutex_scheduler_data_locked= TRUE;
784 DBUG_VOID_RETURN;
785 }
786
787
788 /*
789 Auxiliary function for unlocking LOCK_scheduler_state. Used
790 by the UNLOCK_DATA macro.
791
792 SYNOPSIS
793 Event_scheduler::unlock_data()
794 func Which function is requesting mutex unlock
795 line On which line mutex unlock is requested
796 */
797
798 void
unlock_data(const char * func,uint line)799 Event_scheduler::unlock_data(const char *func, uint line)
800 {
801 DBUG_ENTER("Event_scheduler::unlock_data");
802 DBUG_PRINT("enter", ("func=%s line=%u", func, line));
803 mutex_last_unlocked_at_line= line;
804 mutex_scheduler_data_locked= FALSE;
805 mutex_last_unlocked_in_func= func;
806 mysql_mutex_unlock(&LOCK_scheduler_state);
807 DBUG_VOID_RETURN;
808 }
809
810
811 /*
812 Wrapper for mysql_cond_wait/timedwait
813
814 SYNOPSIS
815 Event_scheduler::cond_wait()
816 thd Thread (Could be NULL during shutdown procedure)
817 abstime If not null then call mysql_cond_timedwait()
818 msg Message for thd->proc_info
819 func Which function is requesting cond_wait
820 line On which line cond_wait is requested
821 */
822
823 void
cond_wait(THD * thd,struct timespec * abstime,const PSI_stage_info * stage,const char * src_func,const char * src_file,uint src_line)824 Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const PSI_stage_info *stage,
825 const char *src_func, const char *src_file, uint src_line)
826 {
827 DBUG_ENTER("Event_scheduler::cond_wait");
828 waiting_on_cond= TRUE;
829 mutex_last_unlocked_at_line= src_line;
830 mutex_scheduler_data_locked= FALSE;
831 mutex_last_unlocked_in_func= src_func;
832 if (thd)
833 thd->enter_cond(&COND_state, &LOCK_scheduler_state, stage,
834 NULL, src_func, src_file, src_line);
835
836 DBUG_PRINT("info", ("mysql_cond_%swait", abstime? "timed":""));
837 if (!abstime)
838 mysql_cond_wait(&COND_state, &LOCK_scheduler_state);
839 else
840 mysql_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
841 if (thd)
842 {
843 /*
844 Need to unlock before exit_cond, so we need to relock.
845 Not the best thing to do but we need to obey cond_wait()
846 */
847 UNLOCK_DATA();
848 thd->exit_cond(NULL, src_func, src_file, src_line);
849 LOCK_DATA();
850 }
851 mutex_last_locked_in_func= src_func;
852 mutex_last_locked_at_line= src_line;
853 mutex_scheduler_data_locked= TRUE;
854 waiting_on_cond= FALSE;
855 DBUG_VOID_RETURN;
856 }
857
858
859 /*
860 Dumps the internal status of the scheduler
861
862 SYNOPSIS
863 Event_scheduler::dump_internal_status()
864 */
865
866 void
dump_internal_status()867 Event_scheduler::dump_internal_status()
868 {
869 DBUG_ENTER("Event_scheduler::dump_internal_status");
870
871 puts("");
872 puts("Event scheduler status:");
873 printf("State : %s\n", scheduler_states_names[state].str);
874 printf("Thread id : %u\n", scheduler_thd? scheduler_thd->thread_id() : 0);
875 printf("LLA : %s:%u\n", mutex_last_locked_in_func,
876 mutex_last_locked_at_line);
877 printf("LUA : %s:%u\n", mutex_last_unlocked_in_func,
878 mutex_last_unlocked_at_line);
879 printf("WOC : %s\n", waiting_on_cond? "YES":"NO");
880 printf("Workers : %d\n", workers_count());
881 printf("Executed : %lu\n", (ulong) started_events);
882 printf("Data locked: %s\n", mutex_scheduler_data_locked ? "YES":"NO");
883
884 DBUG_VOID_RETURN;
885 }
886
887 /**
888 @} (End of group Event_Scheduler)
889 */
890