1 /* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #ifndef RPL_COMMIT_STAGE_MANAGER
24 #define RPL_COMMIT_STAGE_MANAGER
25 
26 #include <atomic>
27 #include <utility>
28 
29 #include "my_dbug.h"
30 #include "mysql/psi/mysql_cond.h"
31 #include "mysql/psi/mysql_mutex.h"
32 #include "sql/sql_class.h"
33 #include "thr_mutex.h"
34 
35 class THD;
36 
37 /**
38   Class for maintaining the commit stages for binary log group commit.
39  */
40 class Commit_stage_manager {
41  public:
42   class Mutex_queue {
43     friend class Commit_stage_manager;
44 
45    public:
Mutex_queue()46     Mutex_queue() : m_first(nullptr), m_last(&m_first), m_size(0) {}
47 
init(mysql_mutex_t * lock)48     void init(mysql_mutex_t *lock) { m_lock = lock; }
49 
is_empty()50     bool is_empty() const { return m_first == nullptr; }
51 
52     /**
53       Append a linked list of threads to the queue.
54 
55       @param[in]  first  Linked list of threads to be appended to queue
56 
57       @retval true The queue was empty before this operation.
58       @retval false The queue was non-empty before this operation.
59     */
60     bool append(THD *first);
61 
62     /**
63       Fetch the entire queue for a stage. It is a wrapper over
64       fetch_and_empty() and acquires queue lock before fetching
65       and emptying the queue threads.
66 
67       @return  Pointer to the first session of the queue.
68     */
69     THD *fetch_and_empty_acquire_lock();
70 
71     /**
72       Fetch the entire queue for a stage. It is a wrapper over
73       fetch_and_empty(). The caller must acquire queue lock before
74       calling this function.
75 
76       @return  Pointer to the first session of the queue.
77     */
78     THD *fetch_and_empty_skip_acquire_lock();
79 
80     /**
81       Remove first member from the queue
82 
83       @retval  Returns std::pair<bool, THD *> object.
84                The first boolean value of pair if true determines queue
85                is not empty, and false determines queue is empty.
86                The second value returns the first removed member.
87     */
88     std::pair<bool, THD *> pop_front();
89 
90     /**
91       Get number of elements in the queue.
92 
93       @retval  Returns number of element in the queue.
94     */
get_size()95     inline int32 get_size() { return m_size.load(); }
96 
97     /**
98       Fetch the first thread of the queue.
99 
100       @return first thread of the queue.
101     */
get_leader()102     THD *get_leader() { return m_first; }
103 
lock()104     void lock() {
105       mysql_mutex_assert_not_owner(m_lock);
106       mysql_mutex_lock(m_lock);
107     }
108 
unlock()109     void unlock() { mysql_mutex_unlock(m_lock); }
110 
assert_owner()111     void assert_owner() { mysql_mutex_assert_owner(m_lock); }
112 
113    private:
114     /**
115       Fetch the entire queue for a stage.
116 
117       @retval  This will fetch the entire queue in one go.
118     */
119     THD *fetch_and_empty();
120 
121     /**
122        Pointer to the first thread in the queue, or nullptr if the queue is
123        empty.
124     */
125     THD *m_first;
126 
127     /**
128        Pointer to the location holding the end of the queue.
129 
130        This is either @c &first, or a pointer to the @c next_to_commit of
131        the last thread that is enqueued.
132     */
133     THD **m_last;
134 
135     /** size of the queue */
136     std::atomic<int32> m_size;
137 
138     /** Lock for protecting the queue. */
139     mysql_mutex_t *m_lock;
140 
141     /*
142       This attribute did not have the desired effect, at least not according
143       to -fsanitize=undefined with gcc 5.2.1
144      */
145   };  // MY_ATTRIBUTE((aligned(CPU_LEVEL1_DCACHE_LINESIZE)));
146 
147  private:
Commit_stage_manager()148   Commit_stage_manager() : m_is_initialized(false) {}
149 
150   Commit_stage_manager(const Commit_stage_manager &) = delete;
151 
152   const Commit_stage_manager &operator=(const Commit_stage_manager &) = delete;
153 
154  public:
155   /**
156     Fetch Commit_stage_manager class instance.
157 
158     @return Reference to the Commit_stage_manager class instance.
159   */
160   static Commit_stage_manager &get_instance();
161 
162   /**
163      Constants for queues for different stages.
164    */
165   enum StageID {
166     BINLOG_FLUSH_STAGE,
167     SYNC_STAGE,
168     COMMIT_STAGE,
169     COMMIT_ORDER_FLUSH_STAGE,
170     STAGE_COUNTER
171   };
172 
173   /**
174     Initializes m_stage_cond_binlog, m_stage_cond_commit_order,
175     m_stage_cond_leader condition variables and m_lock_done mutex.
176 
177     The binlog follower threads blocks on m_stage_cond_binlog condition
178     variable till signalled to wake up from leader thread. And similarly
179     commit order follower threads blocks on m_stage_cond_commit_order
180     condition variable till signalled to wake up from leader thread.
181 
182     The first binlog thread supposed to be leader finds that commit order queue
183     is not empty then it blocks on m_stage_cond_leader till commit order leader
184     signals it to awake and become new leader.
185 
186     m_lock_done mutex is shared by all three stages.
187 
188     @param key_LOCK_flush_queue mutex instrumentation key
189     @param key_LOCK_sync_queue mutex instrumentation key
190     @param key_LOCK_commit_queue mutex instrumentation key
191     @param key_LOCK_done mutex instrumentation key
192     @param key_COND_done cond instrumentation key
193   */
194   void init(PSI_mutex_key key_LOCK_flush_queue,
195             PSI_mutex_key key_LOCK_sync_queue,
196             PSI_mutex_key key_LOCK_commit_queue, PSI_mutex_key key_LOCK_done,
197             PSI_cond_key key_COND_done);
198 
199   /**
200     Deinitializes m_stage_cond_binlog, m_stage_cond_commit_order,
201     m_stage_cond_leader condition variables and m_lock_done mutex.
202   */
203   void deinit();
204 
205   /**
206     Enroll a set of sessions for a stage.
207 
208     This will queue the session thread for writing and flushing.
209 
210     If the thread being queued is assigned as stage leader, it will
211     return immediately.
212 
213     If wait_if_follower is true the thread is not the stage leader,
214     the thread will be wait for the queue to be processed by the
215     leader before it returns.
216     In DBUG-ON version the follower marks is preempt status as ready.
217 
218     The sesssion threads entering this function acquires mutexes, and few of
219     them are not released while exiting based on thread and stage type.
220     - A binlog leader (returning true when stage!=COMMIT_ORDER_FLUSH_STAGE) will
221       acquire the stage mutex in this function and not release it.
222     - A commit order leader of the flush stage (returning true when
223       stage==COMMIT_ORDER_FLUSH_STAGE) will acquire both the stage mutex and the
224       flush queue mutex in this function, and not release them.
225     - A follower (returning false) will release any mutexes it takes, before
226       returning from the function.
227 
228     @param[in] stage Stage identifier for the queue to append to.
229     @param[in] first Queue to append.
230     @param[in] stage_mutex
231                  Pointer to the currently held stage mutex, or nullptr if we're
232                  not in a stage, that will be released when changing stage.
233     @param[in] enter_mutex
234                  Pointer to the mutex that will be taken when changing stage.
235 
236     @retval true  Thread is stage leader.
237     @retval false Thread was not stage leader and processing has been done.
238    */
239   bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex,
240                   mysql_mutex_t *enter_mutex);
241 
242   /**
243     Remove first member from the queue for given stage
244 
245     @retval  Returns std::pair<bool, THD *> object.
246              The first boolean value of pair if true determines queue
247              is not empty, and false determines queue is empty.
248              The second value returns the first removed member.
249   */
pop_front(StageID stage)250   std::pair<bool, THD *> pop_front(StageID stage) {
251     return m_queue[stage].pop_front();
252   }
253 
254 #ifndef DBUG_OFF
255   /**
256      The method ensures the follower's execution path can be preempted
257      by the leader's thread.
258      Preempt status of @c head follower is checked to engange the leader
259      into waiting when set.
260 
261      @param head  THD* of a follower thread
262   */
263   void clear_preempt_status(THD *head);
264 #endif
265 
266   /**
267     Fetch the entire queue and empty it. It acquires queue lock before fetching
268     and emptying the queue threads.
269 
270     @param[in]  stage             Stage identifier for the queue to append to.
271 
272     @return Pointer to the first session of the queue.
273   */
274   THD *fetch_queue_acquire_lock(StageID stage);
275 
276   /**
277     Fetch the entire queue and empty it. The caller must acquire queue lock
278     before calling this function.
279 
280     @param[in]  stage             Stage identifier for the queue to append to.
281 
282     @return Pointer to the first session of the queue.
283   */
284   THD *fetch_queue_skip_acquire_lock(StageID stage);
285 
286   /**
287     Introduces a wait operation on the executing thread.  The
288     waiting is done until the timeout elapses or count is
289     reached (whichever comes first).
290 
291     If count == 0, then the session will wait until the timeout
292     elapses. If timeout == 0, then there is no waiting.
293 
294     @param usec     the number of microseconds to wait.
295     @param count    wait for as many as count to join the queue the
296                     session is waiting on
297     @param stage    which stage queue size to compare count against.
298    */
299   void wait_count_or_timeout(ulong count, long usec, StageID stage);
300 
301   /**
302     The function is called after follower thread are processed by leader,
303     to unblock follower threads.
304 
305     @param queue   the thread list which needs to ne unblocked
306     @param stage   Stage identifier current thread belong to.
307   */
308   void signal_done(THD *queue, StageID stage = BINLOG_FLUSH_STAGE);
309 
310   /**
311     This function gets called after transactions are flushed to the engine
312     i.e. after calling ha_flush_logs, to unblock commit order thread list
313     which are not needed to wait for other stages.
314 
315     @param first     the thread list which needs to ne unblocked
316   */
317   void process_final_stage_for_ordered_commit_group(THD *first);
318 
319   /**
320     Wrapper on Mutex_queue lock(), acquires lock on stage queue.
321 
322     @param[in]  stage  Stage identifier for the queue to append to.
323   */
lock_queue(StageID stage)324   void lock_queue(StageID stage) { m_queue[stage].lock(); }
325 
326   /**
327     Wrapper on Mutex_queue unlock(), releases lock on stage queue.
328 
329     @param[in]  stage  Stage identifier for the queue to append to.
330   */
unlock_queue(StageID stage)331   void unlock_queue(StageID stage) { m_queue[stage].unlock(); }
332 
333  private:
334   /** check if Commit_stage_manager variables already initalized. */
335   bool m_is_initialized;
336 
337   /**
338      Queues for sessions.
339 
340      We need four queues:
341      - Binlog flush queue: transactions that are going to be flushed to the
342                            engine and written to the binary log.
343      - Commit order flush queue: transactions that are not going to write the
344                                  binlog at all, but participate in the beginning
345                                  of the group commit, up to and including the
346                                  engine flush.
347      - Sync queue: transactions that are going to be synced to disk
348      - Commit queue: transactions that are going to to be committed
349                      (when binlog_order_commit=1).
350   */
351   Mutex_queue m_queue[STAGE_COUNTER];
352 
353   /**
354      The binlog leader waits on this condition variable till it is indicated
355      to wake up. If binlog flush queue gets first thread in the queue but
356      by then commit order flush queue has already elected leader. The the
357      first thread of binlog queue waits on this condition variable and get
358      signalled to wake up from commit order flush queue leader later.
359   */
360   mysql_cond_t m_stage_cond_leader;
361 
362   /**
363      Condition variable to indicate that the binlog threads can wake up
364      and continue.
365   */
366   mysql_cond_t m_stage_cond_binlog;
367 
368   /**
369      Condition variable to indicate that the flush to storage engine
370      is done and commit order threads can again wake up and continue.
371   */
372   mysql_cond_t m_stage_cond_commit_order;
373 
374   /** Mutex used for the condition variable above */
375   mysql_mutex_t m_lock_done;
376 
377   /** Mutex used for the stage level locks */
378   mysql_mutex_t m_queue_lock[STAGE_COUNTER - 1];
379 
380 #ifndef DBUG_OFF
381   /** Save pointer to leader thread which is used later to awake leader */
382   THD *leader_thd;
383 
384   /** Flag is set by Leader when it starts waiting for follower's all-clear */
385   bool leader_await_preempt_status;
386 
387   /** Condition variable to indicate a follower started waiting for commit */
388   mysql_cond_t m_cond_preempt;
389 #endif
390 };
391 
392 #endif /*RPL_COMMIT_STAGE_MANAGER*/
393