1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "rpl_slave_commit_order_manager.h"
24 
25 #include "rpl_rli_pdb.h"     // Slave_worker
26 #include "debug_sync.h"      // debug_sync_set_action
27 
Commit_order_manager(uint32 worker_numbers)28 Commit_order_manager::Commit_order_manager(uint32 worker_numbers)
29   : m_rollback_trx(false), m_workers(worker_numbers), queue_head(QUEUE_EOF),
30     queue_tail(QUEUE_EOF)
31 {
32   mysql_mutex_init(key_commit_order_manager_mutex, &m_mutex, NULL);
33   for (uint32 i= 0; i < worker_numbers; i++)
34   {
35     mysql_cond_init(key_commit_order_manager_cond, &m_workers[i].cond);
36     m_workers[i].status= OCS_FINISH;
37   }
38 }
39 
~Commit_order_manager()40 Commit_order_manager::~Commit_order_manager()
41 {
42   mysql_mutex_destroy(&m_mutex);
43 
44   for (uint32 i= 0; i < m_workers.size(); i++)
45   {
46     mysql_cond_destroy(&m_workers[i].cond);
47   }
48 }
49 
register_trx(Slave_worker * worker)50 void Commit_order_manager::register_trx(Slave_worker *worker)
51 {
52   DBUG_ENTER("Commit_order_manager::register_trx");
53 
54   mysql_mutex_lock(&m_mutex);
55 
56   m_workers[worker->id].status= OCS_WAIT;
57   queue_push(worker->id);
58 
59   mysql_mutex_unlock(&m_mutex);
60   DBUG_VOID_RETURN;
61 }
62 
63 /**
64   Waits until it becomes the queue head.
65 
66   @retval false All previous threads succeeded so this thread can go
67   ahead and commit.
68 */
wait_for_its_turn(Slave_worker * worker,bool all)69 bool Commit_order_manager::wait_for_its_turn(Slave_worker *worker,
70                                                   bool all)
71 {
72   DBUG_ENTER("Commit_order_manager::wait_for_its_turn");
73 
74   /*
75     When prior transaction fail, current trx should stop and wait for signal
76     to rollback itself
77   */
78   if ((all || ending_single_stmt_trans(worker->info_thd, all) || m_rollback_trx) &&
79       m_workers[worker->id].status == OCS_WAIT)
80   {
81     PSI_stage_info old_stage;
82     mysql_cond_t *cond= &m_workers[worker->id].cond;
83     THD *thd= worker->info_thd;
84 
85     DBUG_PRINT("info", ("Worker %lu is waiting for commit signal", worker->id));
86 
87     DBUG_EXECUTE_IF("delay_slave_worker_0", {
88       if (worker->id == 1)
89       {
90         static const char act[]= "now SIGNAL signal.w1.wait_for_its_turn";
91         assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
92       }
93     });
94 
95     mysql_mutex_lock(&m_mutex);
96     thd->ENTER_COND(cond, &m_mutex,
97                     &stage_worker_waiting_for_its_turn_to_commit,
98                     &old_stage);
99 
100     while (queue_front() != worker->id)
101     {
102       if (unlikely(worker->found_order_commit_deadlock()))
103       {
104         mysql_mutex_unlock(&m_mutex);
105         thd->EXIT_COND(&old_stage);
106         my_error(ER_LOCK_DEADLOCK, MYF(0));
107         DBUG_RETURN(true);
108       }
109       mysql_cond_wait(cond, &m_mutex);
110     }
111 
112     mysql_mutex_unlock(&m_mutex);
113     thd->EXIT_COND(&old_stage);
114 
115     m_workers[worker->id].status= OCS_SIGNAL;
116 
117     if (m_rollback_trx)
118     {
119       unregister_trx(worker);
120 
121       DBUG_PRINT("info", ("thd has seen an error signal from old thread"));
122       thd->get_stmt_da()->set_overwrite_status(true);
123       my_error(ER_SLAVE_WORKER_STOPPED_PREVIOUS_THD_ERROR, MYF(0));
124     }
125   }
126 
127   DBUG_RETURN(m_rollback_trx);
128 }
129 
unregister_trx(Slave_worker * worker)130 void Commit_order_manager::unregister_trx(Slave_worker *worker)
131 {
132   DBUG_ENTER("Commit_order_manager::unregister_trx");
133 
134   if (m_workers[worker->id].status == OCS_SIGNAL)
135   {
136     DBUG_PRINT("info", ("Worker %lu is signalling next transaction", worker->id));
137 
138     mysql_mutex_lock(&m_mutex);
139 
140     assert(!queue_empty());
141 
142     /* Set next manager as the head and signal the trx to commit. */
143     queue_pop();
144     if (!queue_empty())
145       mysql_cond_signal(&m_workers[queue_front()].cond);
146 
147     m_workers[worker->id].status= OCS_FINISH;
148 
149     mysql_mutex_unlock(&m_mutex);
150   }
151 
152   DBUG_VOID_RETURN;
153 }
154 
report_rollback(Slave_worker * worker)155 void Commit_order_manager::report_rollback(Slave_worker *worker)
156 {
157   DBUG_ENTER("Commit_order_manager::report_rollback");
158 
159   (void) wait_for_its_turn(worker, true);
160   /* No worker can set m_rollback_trx unless it is its turn to commit */
161   m_rollback_trx= true;
162   unregister_trx(worker);
163 
164   DBUG_VOID_RETURN;
165 }
166 
report_deadlock(Slave_worker * worker)167 void Commit_order_manager::report_deadlock(Slave_worker *worker)
168 {
169   DBUG_ENTER("Commit_order_manager::report_deadlock");
170   mysql_mutex_lock(&m_mutex);
171   worker->report_order_commit_deadlock();
172   DBUG_EXECUTE_IF("rpl_fake_cod_deadlock",
173                   {
174                   const char act[]= "now signal reported_deadlock";
175                   assert(!debug_sync_set_action(current_thd,
176                                                 STRING_WITH_LEN(act)));
177                   });
178   mysql_cond_signal(&m_workers[worker->id].cond);
179   mysql_mutex_unlock(&m_mutex);
180   DBUG_VOID_RETURN;
181 }
182 
183