1 #ifndef RPL_PARALLEL_H 2 #define RPL_PARALLEL_H 3 4 #include "log_event.h" 5 6 7 struct rpl_parallel; 8 struct rpl_parallel_entry; 9 struct rpl_parallel_thread_pool; 10 11 class Relay_log_info; 12 struct inuse_relaylog; 13 14 15 /* 16 Structure used to keep track of the parallel replication of a batch of 17 event-groups that group-committed together on the master. 18 19 It is used to ensure that every event group in one batch has reached the 20 commit stage before the next batch starts executing. 21 22 Note the lifetime of this structure: 23 24 - It is allocated when the first event in a new batch of group commits 25 is queued, from the free list rpl_parallel_entry::gco_free_list. 26 27 - The gco for the batch currently being queued is owned by 28 rpl_parallel_entry::current_gco. The gco for a previous batch that has 29 been fully queued is owned by the gco->prev_gco pointer of the gco for 30 the following batch. 31 32 - The worker thread waits on gco->COND_group_commit_orderer for 33 rpl_parallel_entry::count_committing_event_groups to reach wait_count 34 before starting; the first waiter links the gco into the next_gco 35 pointer of the gco of the previous batch for signalling. 36 37 - When an event group reaches the commit stage, it signals the 38 COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and 39 rpl_parallel_entry::count_committing_event_groups has reached 40 gco->next_gco->wait_count. 41 42 - The gco lives until all its event groups have completed their commit. 43 This is detected by rpl_parallel_entry::last_committed_sub_id being 44 greater than or equal gco->last_sub_id. Once this happens, the gco is 45 freed. Note that since update of last_committed_sub_id can happen 46 out-of-order, the thread that frees a given gco can be for any later 47 event group, not necessarily an event group from the gco being freed. 48 */ 49 struct group_commit_orderer { 50 /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ 51 mysql_cond_t COND_group_commit_orderer; 52 uint64 wait_count; 53 group_commit_orderer *prev_gco; 54 group_commit_orderer *next_gco; 55 /* 56 The sub_id of last event group in the previous GCO. 57 Only valid if prev_gco != NULL. 58 */ 59 uint64 prior_sub_id; 60 /* 61 The sub_id of the last event group in this GCO. Only valid when next_gco 62 is non-NULL. 63 */ 64 uint64 last_sub_id; 65 /* 66 This flag is set when this GCO has been installed into the next_gco pointer 67 of the previous GCO. 68 */ 69 bool installed; 70 71 enum force_switch_bits 72 { 73 /* 74 This flag is set for a GCO in which we have event groups with multiple 75 different commit_id values from the master. This happens when we 76 optimistically try to execute in parallel transactions not known to be 77 conflict-free. 78 79 When this flag is set, in case of DDL we need to start a new GCO 80 regardless of current commit_id, as DDL is not safe to 81 speculatively apply in parallel with prior event groups. 82 */ 83 MULTI_BATCH= 1, 84 /* 85 This flag is set for a GCO that contains DDL. If set, it forces 86 a switch to a new GCO upon seeing a new commit_id, as DDL is not 87 safe to speculatively replicate in parallel with subsequent 88 transactions. 89 */ 90 FORCE_SWITCH= 2 91 }; 92 uint8 flags; 93 }; 94 95 96 struct rpl_parallel_thread { 97 bool delay_start; 98 bool running; 99 bool stop; 100 bool pause_for_ftwrl; 101 mysql_mutex_t LOCK_rpl_thread; 102 mysql_cond_t COND_rpl_thread; 103 mysql_cond_t COND_rpl_thread_queue; 104 mysql_cond_t COND_rpl_thread_stop; 105 struct rpl_parallel_thread *next; /* For free list. */ 106 struct rpl_parallel_thread_pool *pool; 107 THD *thd; 108 /* 109 Who owns the thread, if any (it's a pointer into the 110 rpl_parallel_entry::rpl_threads array. 111 */ 112 struct rpl_parallel_thread **current_owner; 113 /* The rpl_parallel_entry of the owner. */ 114 rpl_parallel_entry *current_entry; 115 struct queued_event { 116 queued_event *next; 117 /* 118 queued_event can hold either an event to be executed, or just a binlog 119 position to be updated without any associated event. 120 */ 121 enum queued_event_t { 122 QUEUED_EVENT, 123 QUEUED_POS_UPDATE, 124 QUEUED_MASTER_RESTART 125 } typ; 126 union { 127 Log_event *ev; /* QUEUED_EVENT */ 128 rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE and 129 QUEUED_MASTER_RESTART */ 130 }; 131 rpl_group_info *rgi; 132 inuse_relaylog *ir; 133 ulonglong future_event_relay_log_pos; 134 char event_relay_log_name[FN_REFLEN]; 135 char future_event_master_log_name[FN_REFLEN]; 136 ulonglong event_relay_log_pos; 137 my_off_t future_event_master_log_pos; 138 size_t event_size; 139 } *event_queue, *last_in_queue; 140 uint64 queued_size; 141 /* These free lists are protected by LOCK_rpl_thread. */ 142 queued_event *qev_free_list; 143 rpl_group_info *rgi_free_list; 144 group_commit_orderer *gco_free_list; 145 /* 146 These free lists are local to the thread, so need not be protected by any 147 lock. They are moved to the global free lists in batches in the function 148 batch_free(), to reduce LOCK_rpl_thread contention. 149 150 The lists are not NULL-terminated (as we do not need to traverse them). 151 Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the 152 `next' pointer of the last element, which is used to link into the front 153 of the global freelists. 154 */ 155 queued_event *loc_qev_list, **loc_qev_last_ptr_ptr; 156 size_t loc_qev_size; 157 uint64 qev_free_pending; 158 rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr; 159 group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr; 160 /* These keep track of batch update of inuse_relaylog refcounts. */ 161 inuse_relaylog *accumulated_ir_last; 162 uint64 accumulated_ir_count; 163 enqueuerpl_parallel_thread164 void enqueue(queued_event *qev) 165 { 166 if (last_in_queue) 167 last_in_queue->next= qev; 168 else 169 event_queue= qev; 170 last_in_queue= qev; 171 queued_size+= qev->event_size; 172 } 173 dequeue1rpl_parallel_thread174 void dequeue1(queued_event *list) 175 { 176 DBUG_ASSERT(list == event_queue); 177 event_queue= last_in_queue= NULL; 178 } 179 dequeue2rpl_parallel_thread180 void dequeue2(size_t dequeue_size) 181 { 182 queued_size-= dequeue_size; 183 } 184 185 queued_event *get_qev_common(Log_event *ev, ulonglong event_size); 186 queued_event *get_qev(Log_event *ev, ulonglong event_size, 187 Relay_log_info *rli); 188 queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev, 189 const char *relay_log_name, 190 ulonglong event_pos, ulonglong event_size); 191 /* 192 Put a qev on the local free list, to be later released to the global free 193 list by batch_free(). 194 */ 195 void loc_free_qev(queued_event *qev); 196 /* 197 Release an rgi immediately to the global free list. Requires holding the 198 LOCK_rpl_thread mutex. 199 */ 200 void free_qev(queued_event *qev); 201 rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, 202 rpl_parallel_entry *e, ulonglong event_size); 203 /* 204 Put an gco on the local free list, to be later released to the global free 205 list by batch_free(). 206 */ 207 void loc_free_rgi(rpl_group_info *rgi); 208 /* 209 Release an rgi immediately to the global free list. Requires holding the 210 LOCK_rpl_thread mutex. 211 */ 212 void free_rgi(rpl_group_info *rgi); 213 group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev, 214 uint64 first_sub_id); 215 /* 216 Put a gco on the local free list, to be later released to the global free 217 list by batch_free(). 218 */ 219 void loc_free_gco(group_commit_orderer *gco); 220 /* 221 Move all local free lists to the global ones. Requires holding 222 LOCK_rpl_thread. 223 */ 224 void batch_free(); 225 /* Update inuse_relaylog refcounts with what we have accumulated so far. */ 226 void inuse_relaylog_refcount_update(); 227 }; 228 229 230 struct rpl_parallel_thread_pool { 231 struct rpl_parallel_thread **threads; 232 struct rpl_parallel_thread *free_list; 233 mysql_mutex_t LOCK_rpl_thread_pool; 234 mysql_cond_t COND_rpl_thread_pool; 235 uint32 count; 236 bool inited; 237 /* 238 While FTWRL runs, this counter is incremented to make SQL thread or 239 STOP/START slave not try to start new activity while that operation 240 is in progress. 241 */ 242 bool busy; 243 244 rpl_parallel_thread_pool(); 245 int init(uint32 size); 246 void destroy(); 247 void deactivate(); 248 void destroy_cond_mutex(); 249 struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner, 250 rpl_parallel_entry *entry); 251 void release_thread(rpl_parallel_thread *rpt); 252 }; 253 254 255 struct rpl_parallel_entry { 256 mysql_mutex_t LOCK_parallel_entry; 257 mysql_cond_t COND_parallel_entry; 258 uint32 domain_id; 259 /* 260 Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show 261 that they are waiting, so that finish_event_group knows to signal them 262 when last_committed_sub_id is increased. 263 */ 264 uint32 need_sub_id_signal; 265 uint64 last_commit_id; 266 bool active; 267 /* 268 Set when SQL thread is shutting down, and no more events can be processed, 269 so worker threads must force abort any current transactions without 270 waiting for event groups to complete. 271 */ 272 bool force_abort; 273 /* 274 At STOP SLAVE (force_abort=true), we do not want to process all events in 275 the queue (which could unnecessarily delay stop, if a lot of events happen 276 to be queued). The stop_count provides a safe point at which to stop, so 277 that everything before becomes committed and nothing after does. The value 278 corresponds to group_commit_orderer::wait_count; if wait_count is less than 279 or equal to stop_count, we execute the associated event group, else we 280 skip it (and all following) and stop. 281 */ 282 uint64 stop_count; 283 284 /* 285 Cyclic array recording the last rpl_thread_max worker threads that we 286 queued event for. This is used to limit how many workers a single domain 287 can occupy (--slave-domain-parallel-threads). 288 289 Note that workers are never explicitly deleted from the array. Instead, 290 we need to check (under LOCK_rpl_thread) that the thread still belongs 291 to us before re-using (rpl_thread::current_owner). 292 */ 293 rpl_parallel_thread **rpl_threads; 294 uint32 rpl_thread_max; 295 uint32 rpl_thread_idx; 296 /* 297 The sub_id of the last transaction to commit within this domain_id. 298 Must be accessed under LOCK_parallel_entry protection. 299 300 Event groups commit in order, so the rpl_group_info for an event group 301 will be alive (at least) as long as 302 rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to 303 safely refer back to previous event groups if they are still executing, 304 and ignore them if they completed, without requiring explicit 305 synchronisation between the threads. 306 */ 307 uint64 last_committed_sub_id; 308 /* 309 The sub_id of the last event group in this replication domain that was 310 queued for execution by a worker thread. 311 */ 312 uint64 current_sub_id; 313 /* 314 The largest sub_id that has started its transaction. Protected by 315 LOCK_parallel_entry. 316 317 (Transactions can start out-of-order, so this value signifies that no 318 transactions with larger sub_id have started, but not necessarily that all 319 transactions with smaller sub_id have started). 320 */ 321 uint64 largest_started_sub_id; 322 rpl_group_info *current_group_info; 323 /* 324 If we get an error in some event group, we set the sub_id of that event 325 group here. Then later event groups (with higher sub_id) can know not to 326 try to start (event groups that already started will be rolled back when 327 wait_for_prior_commit() returns error). 328 The value is ULONGLONG_MAX when no error occurred. 329 */ 330 uint64 stop_on_error_sub_id; 331 /* 332 During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than 333 this value must not start, but wait until the global read lock is released. 334 The value is set to ULONGLONG_MAX when no FTWRL is pending. 335 */ 336 uint64 pause_sub_id; 337 /* Total count of event groups queued so far. */ 338 uint64 count_queued_event_groups; 339 /* 340 Count of event groups that have started (but not necessarily completed) 341 the commit phase. We use this to know when every event group in a previous 342 batch of master group commits have started committing on the slave, so 343 that it is safe to start executing the events in the following batch. 344 */ 345 uint64 count_committing_event_groups; 346 /* The group_commit_orderer object for the events currently being queued. */ 347 group_commit_orderer *current_gco; 348 349 rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, 350 PSI_stage_info *old_stage, bool reuse); 351 int queue_master_restart(rpl_group_info *rgi, 352 Format_description_log_event *fdev); 353 }; 354 struct rpl_parallel { 355 HASH domain_hash; 356 rpl_parallel_entry *current; 357 bool sql_thread_stopping; 358 359 rpl_parallel(); 360 ~rpl_parallel(); 361 void reset(); 362 rpl_parallel_entry *find(uint32 domain_id); 363 void wait_for_done(THD *thd, Relay_log_info *rli); 364 void stop_during_until(); 365 bool workers_idle(); 366 int wait_for_workers_idle(THD *thd); 367 int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); 368 }; 369 370 371 extern struct rpl_parallel_thread_pool global_rpl_thread_pool; 372 373 374 extern int rpl_parallel_resize_pool_if_no_slaves(void); 375 extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); 376 extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); 377 extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); 378 extern int rpl_pause_for_ftwrl(THD *thd); 379 extern void rpl_unpause_after_ftwrl(THD *thd); 380 381 #endif /* RPL_PARALLEL_H */ 382