1 /* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 22 23 #ifndef RPL_COMMIT_STAGE_MANAGER 24 #define RPL_COMMIT_STAGE_MANAGER 25 26 #include <atomic> 27 #include <utility> 28 29 #include "my_dbug.h" 30 #include "mysql/psi/mysql_cond.h" 31 #include "mysql/psi/mysql_mutex.h" 32 #include "sql/sql_class.h" 33 #include "thr_mutex.h" 34 35 class THD; 36 37 /** 38 Class for maintaining the commit stages for binary log group commit. 39 */ 40 class Commit_stage_manager { 41 public: 42 class Mutex_queue { 43 friend class Commit_stage_manager; 44 45 public: Mutex_queue()46 Mutex_queue() : m_first(nullptr), m_last(&m_first), m_size(0) {} 47 init(mysql_mutex_t * lock)48 void init(mysql_mutex_t *lock) { m_lock = lock; } 49 is_empty()50 bool is_empty() const { return m_first == nullptr; } 51 52 /** 53 Append a linked list of threads to the queue. 54 55 @param[in] first Linked list of threads to be appended to queue 56 57 @retval true The queue was empty before this operation. 58 @retval false The queue was non-empty before this operation. 59 */ 60 bool append(THD *first); 61 62 /** 63 Fetch the entire queue for a stage. It is a wrapper over 64 fetch_and_empty() and acquires queue lock before fetching 65 and emptying the queue threads. 66 67 @return Pointer to the first session of the queue. 68 */ 69 THD *fetch_and_empty_acquire_lock(); 70 71 /** 72 Fetch the entire queue for a stage. It is a wrapper over 73 fetch_and_empty(). The caller must acquire queue lock before 74 calling this function. 75 76 @return Pointer to the first session of the queue. 77 */ 78 THD *fetch_and_empty_skip_acquire_lock(); 79 80 /** 81 Remove first member from the queue 82 83 @retval Returns std::pair<bool, THD *> object. 84 The first boolean value of pair if true determines queue 85 is not empty, and false determines queue is empty. 86 The second value returns the first removed member. 87 */ 88 std::pair<bool, THD *> pop_front(); 89 90 /** 91 Get number of elements in the queue. 92 93 @retval Returns number of element in the queue. 94 */ get_size()95 inline int32 get_size() { return m_size.load(); } 96 97 /** 98 Fetch the first thread of the queue. 99 100 @return first thread of the queue. 101 */ get_leader()102 THD *get_leader() { return m_first; } 103 lock()104 void lock() { 105 mysql_mutex_assert_not_owner(m_lock); 106 mysql_mutex_lock(m_lock); 107 } 108 unlock()109 void unlock() { mysql_mutex_unlock(m_lock); } 110 assert_owner()111 void assert_owner() { mysql_mutex_assert_owner(m_lock); } 112 113 private: 114 /** 115 Fetch the entire queue for a stage. 116 117 @retval This will fetch the entire queue in one go. 118 */ 119 THD *fetch_and_empty(); 120 121 /** 122 Pointer to the first thread in the queue, or nullptr if the queue is 123 empty. 124 */ 125 THD *m_first; 126 127 /** 128 Pointer to the location holding the end of the queue. 129 130 This is either @c &first, or a pointer to the @c next_to_commit of 131 the last thread that is enqueued. 132 */ 133 THD **m_last; 134 135 /** size of the queue */ 136 std::atomic<int32> m_size; 137 138 /** Lock for protecting the queue. */ 139 mysql_mutex_t *m_lock; 140 141 /* 142 This attribute did not have the desired effect, at least not according 143 to -fsanitize=undefined with gcc 5.2.1 144 */ 145 }; // MY_ATTRIBUTE((aligned(CPU_LEVEL1_DCACHE_LINESIZE))); 146 147 private: Commit_stage_manager()148 Commit_stage_manager() : m_is_initialized(false) {} 149 150 Commit_stage_manager(const Commit_stage_manager &) = delete; 151 152 const Commit_stage_manager &operator=(const Commit_stage_manager &) = delete; 153 154 public: 155 /** 156 Fetch Commit_stage_manager class instance. 157 158 @return Reference to the Commit_stage_manager class instance. 159 */ 160 static Commit_stage_manager &get_instance(); 161 162 /** 163 Constants for queues for different stages. 164 */ 165 enum StageID { 166 BINLOG_FLUSH_STAGE, 167 SYNC_STAGE, 168 COMMIT_STAGE, 169 COMMIT_ORDER_FLUSH_STAGE, 170 STAGE_COUNTER 171 }; 172 173 /** 174 Initializes m_stage_cond_binlog, m_stage_cond_commit_order, 175 m_stage_cond_leader condition variables and m_lock_done mutex. 176 177 The binlog follower threads blocks on m_stage_cond_binlog condition 178 variable till signalled to wake up from leader thread. And similarly 179 commit order follower threads blocks on m_stage_cond_commit_order 180 condition variable till signalled to wake up from leader thread. 181 182 The first binlog thread supposed to be leader finds that commit order queue 183 is not empty then it blocks on m_stage_cond_leader till commit order leader 184 signals it to awake and become new leader. 185 186 m_lock_done mutex is shared by all three stages. 187 188 @param key_LOCK_flush_queue mutex instrumentation key 189 @param key_LOCK_sync_queue mutex instrumentation key 190 @param key_LOCK_commit_queue mutex instrumentation key 191 @param key_LOCK_done mutex instrumentation key 192 @param key_COND_done cond instrumentation key 193 */ 194 void init(PSI_mutex_key key_LOCK_flush_queue, 195 PSI_mutex_key key_LOCK_sync_queue, 196 PSI_mutex_key key_LOCK_commit_queue, PSI_mutex_key key_LOCK_done, 197 PSI_cond_key key_COND_done); 198 199 /** 200 Deinitializes m_stage_cond_binlog, m_stage_cond_commit_order, 201 m_stage_cond_leader condition variables and m_lock_done mutex. 202 */ 203 void deinit(); 204 205 /** 206 Enroll a set of sessions for a stage. 207 208 This will queue the session thread for writing and flushing. 209 210 If the thread being queued is assigned as stage leader, it will 211 return immediately. 212 213 If wait_if_follower is true the thread is not the stage leader, 214 the thread will be wait for the queue to be processed by the 215 leader before it returns. 216 In DBUG-ON version the follower marks is preempt status as ready. 217 218 The sesssion threads entering this function acquires mutexes, and few of 219 them are not released while exiting based on thread and stage type. 220 - A binlog leader (returning true when stage!=COMMIT_ORDER_FLUSH_STAGE) will 221 acquire the stage mutex in this function and not release it. 222 - A commit order leader of the flush stage (returning true when 223 stage==COMMIT_ORDER_FLUSH_STAGE) will acquire both the stage mutex and the 224 flush queue mutex in this function, and not release them. 225 - A follower (returning false) will release any mutexes it takes, before 226 returning from the function. 227 228 @param[in] stage Stage identifier for the queue to append to. 229 @param[in] first Queue to append. 230 @param[in] stage_mutex 231 Pointer to the currently held stage mutex, or nullptr if we're 232 not in a stage, that will be released when changing stage. 233 @param[in] enter_mutex 234 Pointer to the mutex that will be taken when changing stage. 235 236 @retval true Thread is stage leader. 237 @retval false Thread was not stage leader and processing has been done. 238 */ 239 bool enroll_for(StageID stage, THD *first, mysql_mutex_t *stage_mutex, 240 mysql_mutex_t *enter_mutex); 241 242 /** 243 Remove first member from the queue for given stage 244 245 @retval Returns std::pair<bool, THD *> object. 246 The first boolean value of pair if true determines queue 247 is not empty, and false determines queue is empty. 248 The second value returns the first removed member. 249 */ pop_front(StageID stage)250 std::pair<bool, THD *> pop_front(StageID stage) { 251 return m_queue[stage].pop_front(); 252 } 253 254 #ifndef DBUG_OFF 255 /** 256 The method ensures the follower's execution path can be preempted 257 by the leader's thread. 258 Preempt status of @c head follower is checked to engange the leader 259 into waiting when set. 260 261 @param head THD* of a follower thread 262 */ 263 void clear_preempt_status(THD *head); 264 #endif 265 266 /** 267 Fetch the entire queue and empty it. It acquires queue lock before fetching 268 and emptying the queue threads. 269 270 @param[in] stage Stage identifier for the queue to append to. 271 272 @return Pointer to the first session of the queue. 273 */ 274 THD *fetch_queue_acquire_lock(StageID stage); 275 276 /** 277 Fetch the entire queue and empty it. The caller must acquire queue lock 278 before calling this function. 279 280 @param[in] stage Stage identifier for the queue to append to. 281 282 @return Pointer to the first session of the queue. 283 */ 284 THD *fetch_queue_skip_acquire_lock(StageID stage); 285 286 /** 287 Introduces a wait operation on the executing thread. The 288 waiting is done until the timeout elapses or count is 289 reached (whichever comes first). 290 291 If count == 0, then the session will wait until the timeout 292 elapses. If timeout == 0, then there is no waiting. 293 294 @param usec the number of microseconds to wait. 295 @param count wait for as many as count to join the queue the 296 session is waiting on 297 @param stage which stage queue size to compare count against. 298 */ 299 void wait_count_or_timeout(ulong count, long usec, StageID stage); 300 301 /** 302 The function is called after follower thread are processed by leader, 303 to unblock follower threads. 304 305 @param queue the thread list which needs to ne unblocked 306 @param stage Stage identifier current thread belong to. 307 */ 308 void signal_done(THD *queue, StageID stage = BINLOG_FLUSH_STAGE); 309 310 /** 311 This function gets called after transactions are flushed to the engine 312 i.e. after calling ha_flush_logs, to unblock commit order thread list 313 which are not needed to wait for other stages. 314 315 @param first the thread list which needs to ne unblocked 316 */ 317 void process_final_stage_for_ordered_commit_group(THD *first); 318 319 /** 320 Wrapper on Mutex_queue lock(), acquires lock on stage queue. 321 322 @param[in] stage Stage identifier for the queue to append to. 323 */ lock_queue(StageID stage)324 void lock_queue(StageID stage) { m_queue[stage].lock(); } 325 326 /** 327 Wrapper on Mutex_queue unlock(), releases lock on stage queue. 328 329 @param[in] stage Stage identifier for the queue to append to. 330 */ unlock_queue(StageID stage)331 void unlock_queue(StageID stage) { m_queue[stage].unlock(); } 332 333 private: 334 /** check if Commit_stage_manager variables already initalized. */ 335 bool m_is_initialized; 336 337 /** 338 Queues for sessions. 339 340 We need four queues: 341 - Binlog flush queue: transactions that are going to be flushed to the 342 engine and written to the binary log. 343 - Commit order flush queue: transactions that are not going to write the 344 binlog at all, but participate in the beginning 345 of the group commit, up to and including the 346 engine flush. 347 - Sync queue: transactions that are going to be synced to disk 348 - Commit queue: transactions that are going to to be committed 349 (when binlog_order_commit=1). 350 */ 351 Mutex_queue m_queue[STAGE_COUNTER]; 352 353 /** 354 The binlog leader waits on this condition variable till it is indicated 355 to wake up. If binlog flush queue gets first thread in the queue but 356 by then commit order flush queue has already elected leader. The the 357 first thread of binlog queue waits on this condition variable and get 358 signalled to wake up from commit order flush queue leader later. 359 */ 360 mysql_cond_t m_stage_cond_leader; 361 362 /** 363 Condition variable to indicate that the binlog threads can wake up 364 and continue. 365 */ 366 mysql_cond_t m_stage_cond_binlog; 367 368 /** 369 Condition variable to indicate that the flush to storage engine 370 is done and commit order threads can again wake up and continue. 371 */ 372 mysql_cond_t m_stage_cond_commit_order; 373 374 /** Mutex used for the condition variable above */ 375 mysql_mutex_t m_lock_done; 376 377 /** Mutex used for the stage level locks */ 378 mysql_mutex_t m_queue_lock[STAGE_COUNTER - 1]; 379 380 #ifndef DBUG_OFF 381 /** Save pointer to leader thread which is used later to awake leader */ 382 THD *leader_thd; 383 384 /** Flag is set by Leader when it starts waiting for follower's all-clear */ 385 bool leader_await_preempt_status; 386 387 /** Condition variable to indicate a follower started waiting for commit */ 388 mysql_cond_t m_cond_preempt; 389 #endif 390 }; 391 392 #endif /*RPL_COMMIT_STAGE_MANAGER*/ 393