1 #ifndef RPL_PARALLEL_H
2 #define RPL_PARALLEL_H
3 
4 #include "log_event.h"
5 
6 
7 struct rpl_parallel;
8 struct rpl_parallel_entry;
9 struct rpl_parallel_thread_pool;
10 
11 class Relay_log_info;
12 struct inuse_relaylog;
13 
14 
15 /*
16   Structure used to keep track of the parallel replication of a batch of
17   event-groups that group-committed together on the master.
18 
19   It is used to ensure that every event group in one batch has reached the
20   commit stage before the next batch starts executing.
21 
22   Note the lifetime of this structure:
23 
24    - It is allocated when the first event in a new batch of group commits
25      is queued, from the free list rpl_parallel_entry::gco_free_list.
26 
27    - The gco for the batch currently being queued is owned by
28      rpl_parallel_entry::current_gco. The gco for a previous batch that has
29      been fully queued is owned by the gco->prev_gco pointer of the gco for
30      the following batch.
31 
32    - The worker thread waits on gco->COND_group_commit_orderer for
33      rpl_parallel_entry::count_committing_event_groups to reach wait_count
34      before starting; the first waiter links the gco into the next_gco
35      pointer of the gco of the previous batch for signalling.
36 
37    - When an event group reaches the commit stage, it signals the
38      COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and
39      rpl_parallel_entry::count_committing_event_groups has reached
40      gco->next_gco->wait_count.
41 
42    - The gco lives until all its event groups have completed their commit.
43      This is detected by rpl_parallel_entry::last_committed_sub_id being
44      greater than or equal gco->last_sub_id. Once this happens, the gco is
45      freed. Note that since update of last_committed_sub_id can happen
46      out-of-order, the thread that frees a given gco can be for any later
47      event group, not necessarily an event group from the gco being freed.
48 */
49 struct group_commit_orderer {
50   /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */
51   mysql_cond_t COND_group_commit_orderer;
52   uint64 wait_count;
53   group_commit_orderer *prev_gco;
54   group_commit_orderer *next_gco;
55   /*
56     The sub_id of last event group in the previous GCO.
57     Only valid if prev_gco != NULL.
58   */
59   uint64 prior_sub_id;
60   /*
61     The sub_id of the last event group in this GCO. Only valid when next_gco
62     is non-NULL.
63   */
64   uint64 last_sub_id;
65   /*
66     This flag is set when this GCO has been installed into the next_gco pointer
67     of the previous GCO.
68   */
69   bool installed;
70 
71   enum force_switch_bits
72   {
73     /*
74       This flag is set for a GCO in which we have event groups with multiple
75       different commit_id values from the master. This happens when we
76       optimistically try to execute in parallel transactions not known to be
77       conflict-free.
78 
79       When this flag is set, in case of DDL we need to start a new GCO
80       regardless of current commit_id, as DDL is not safe to
81       speculatively apply in parallel with prior event groups.
82     */
83     MULTI_BATCH= 1,
84     /*
85       This flag is set for a GCO that contains DDL. If set, it forces
86       a switch to a new GCO upon seeing a new commit_id, as DDL is not
87       safe to speculatively replicate in parallel with subsequent
88       transactions.
89     */
90     FORCE_SWITCH= 2
91   };
92   uint8 flags;
93 };
94 
95 
96 struct rpl_parallel_thread {
97   bool delay_start;
98   bool running;
99   bool stop;
100   bool pause_for_ftwrl;
101   mysql_mutex_t LOCK_rpl_thread;
102   mysql_cond_t COND_rpl_thread;
103   mysql_cond_t COND_rpl_thread_queue;
104   mysql_cond_t COND_rpl_thread_stop;
105   struct rpl_parallel_thread *next;             /* For free list. */
106   struct rpl_parallel_thread_pool *pool;
107   THD *thd;
108   /*
109     Who owns the thread, if any (it's a pointer into the
110     rpl_parallel_entry::rpl_threads array.
111   */
112   struct rpl_parallel_thread **current_owner;
113   /* The rpl_parallel_entry of the owner. */
114   rpl_parallel_entry *current_entry;
115   struct queued_event {
116     queued_event *next;
117     /*
118       queued_event can hold either an event to be executed, or just a binlog
119       position to be updated without any associated event.
120     */
121     enum queued_event_t {
122       QUEUED_EVENT,
123       QUEUED_POS_UPDATE,
124       QUEUED_MASTER_RESTART
125     } typ;
126     union {
127       Log_event *ev;                            /* QUEUED_EVENT */
128       rpl_parallel_entry *entry_for_queued;     /* QUEUED_POS_UPDATE and
129                                                    QUEUED_MASTER_RESTART */
130     };
131     rpl_group_info *rgi;
132     inuse_relaylog *ir;
133     ulonglong future_event_relay_log_pos;
134     char event_relay_log_name[FN_REFLEN];
135     char future_event_master_log_name[FN_REFLEN];
136     ulonglong event_relay_log_pos;
137     my_off_t future_event_master_log_pos;
138     size_t event_size;
139   } *event_queue, *last_in_queue;
140   uint64 queued_size;
141   /* These free lists are protected by LOCK_rpl_thread. */
142   queued_event *qev_free_list;
143   rpl_group_info *rgi_free_list;
144   group_commit_orderer *gco_free_list;
145   /*
146     These free lists are local to the thread, so need not be protected by any
147     lock. They are moved to the global free lists in batches in the function
148     batch_free(), to reduce LOCK_rpl_thread contention.
149 
150     The lists are not NULL-terminated (as we do not need to traverse them).
151     Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the
152     `next' pointer of the last element, which is used to link into the front
153     of the global freelists.
154   */
155   queued_event *loc_qev_list, **loc_qev_last_ptr_ptr;
156   size_t loc_qev_size;
157   uint64 qev_free_pending;
158   rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr;
159   group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr;
160   /* These keep track of batch update of inuse_relaylog refcounts. */
161   inuse_relaylog *accumulated_ir_last;
162   uint64 accumulated_ir_count;
163 
enqueuerpl_parallel_thread164   void enqueue(queued_event *qev)
165   {
166     if (last_in_queue)
167       last_in_queue->next= qev;
168     else
169       event_queue= qev;
170     last_in_queue= qev;
171     queued_size+= qev->event_size;
172   }
173 
dequeue1rpl_parallel_thread174   void dequeue1(queued_event *list)
175   {
176     DBUG_ASSERT(list == event_queue);
177     event_queue= last_in_queue= NULL;
178   }
179 
dequeue2rpl_parallel_thread180   void dequeue2(size_t dequeue_size)
181   {
182     queued_size-= dequeue_size;
183   }
184 
185   queued_event *get_qev_common(Log_event *ev, ulonglong event_size);
186   queued_event *get_qev(Log_event *ev, ulonglong event_size,
187                         Relay_log_info *rli);
188   queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev,
189                               const char *relay_log_name,
190                               ulonglong event_pos, ulonglong event_size);
191   /*
192     Put a qev on the local free list, to be later released to the global free
193     list by batch_free().
194   */
195   void loc_free_qev(queued_event *qev);
196   /*
197     Release an rgi immediately to the global free list. Requires holding the
198     LOCK_rpl_thread mutex.
199   */
200   void free_qev(queued_event *qev);
201   rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev,
202                           rpl_parallel_entry *e, ulonglong event_size);
203   /*
204     Put an gco on the local free list, to be later released to the global free
205     list by batch_free().
206   */
207   void loc_free_rgi(rpl_group_info *rgi);
208   /*
209     Release an rgi immediately to the global free list. Requires holding the
210     LOCK_rpl_thread mutex.
211   */
212   void free_rgi(rpl_group_info *rgi);
213   group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev,
214                                 uint64 first_sub_id);
215   /*
216     Put a gco on the local free list, to be later released to the global free
217     list by batch_free().
218   */
219   void loc_free_gco(group_commit_orderer *gco);
220   /*
221     Move all local free lists to the global ones. Requires holding
222     LOCK_rpl_thread.
223   */
224   void batch_free();
225   /* Update inuse_relaylog refcounts with what we have accumulated so far. */
226   void inuse_relaylog_refcount_update();
227 };
228 
229 
230 struct rpl_parallel_thread_pool {
231   struct rpl_parallel_thread **threads;
232   struct rpl_parallel_thread *free_list;
233   mysql_mutex_t LOCK_rpl_thread_pool;
234   mysql_cond_t COND_rpl_thread_pool;
235   uint32 count;
236   bool inited;
237   /*
238     While FTWRL runs, this counter is incremented to make SQL thread or
239     STOP/START slave not try to start new activity while that operation
240     is in progress.
241   */
242   bool busy;
243 
244   rpl_parallel_thread_pool();
245   int init(uint32 size);
246   void destroy();
247   void deactivate();
248   void destroy_cond_mutex();
249   struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner,
250                                          rpl_parallel_entry *entry);
251   void release_thread(rpl_parallel_thread *rpt);
252 };
253 
254 
255 struct rpl_parallel_entry {
256   mysql_mutex_t LOCK_parallel_entry;
257   mysql_cond_t COND_parallel_entry;
258   uint32 domain_id;
259   /*
260     Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show
261     that they are waiting, so that finish_event_group knows to signal them
262     when last_committed_sub_id is increased.
263   */
264   uint32 need_sub_id_signal;
265   uint64 last_commit_id;
266   bool active;
267   /*
268     Set when SQL thread is shutting down, and no more events can be processed,
269     so worker threads must force abort any current transactions without
270     waiting for event groups to complete.
271   */
272   bool force_abort;
273   /*
274    At STOP SLAVE (force_abort=true), we do not want to process all events in
275    the queue (which could unnecessarily delay stop, if a lot of events happen
276    to be queued). The stop_count provides a safe point at which to stop, so
277    that everything before becomes committed and nothing after does. The value
278    corresponds to group_commit_orderer::wait_count; if wait_count is less than
279    or equal to stop_count, we execute the associated event group, else we
280    skip it (and all following) and stop.
281   */
282   uint64 stop_count;
283 
284   /*
285     Cyclic array recording the last rpl_thread_max worker threads that we
286     queued event for. This is used to limit how many workers a single domain
287     can occupy (--slave-domain-parallel-threads).
288 
289     Note that workers are never explicitly deleted from the array. Instead,
290     we need to check (under LOCK_rpl_thread) that the thread still belongs
291     to us before re-using (rpl_thread::current_owner).
292   */
293   rpl_parallel_thread **rpl_threads;
294   uint32 rpl_thread_max;
295   uint32 rpl_thread_idx;
296   /*
297     The sub_id of the last transaction to commit within this domain_id.
298     Must be accessed under LOCK_parallel_entry protection.
299 
300     Event groups commit in order, so the rpl_group_info for an event group
301     will be alive (at least) as long as
302     rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to
303     safely refer back to previous event groups if they are still executing,
304     and ignore them if they completed, without requiring explicit
305     synchronisation between the threads.
306   */
307   uint64 last_committed_sub_id;
308   /*
309     The sub_id of the last event group in this replication domain that was
310     queued for execution by a worker thread.
311   */
312   uint64 current_sub_id;
313   /*
314     The largest sub_id that has started its transaction. Protected by
315     LOCK_parallel_entry.
316 
317     (Transactions can start out-of-order, so this value signifies that no
318     transactions with larger sub_id have started, but not necessarily that all
319     transactions with smaller sub_id have started).
320   */
321   uint64 largest_started_sub_id;
322   rpl_group_info *current_group_info;
323   /*
324     If we get an error in some event group, we set the sub_id of that event
325     group here. Then later event groups (with higher sub_id) can know not to
326     try to start (event groups that already started will be rolled back when
327     wait_for_prior_commit() returns error).
328     The value is ULONGLONG_MAX when no error occurred.
329   */
330   uint64 stop_on_error_sub_id;
331   /*
332     During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than
333     this value must not start, but wait until the global read lock is released.
334     The value is set to ULONGLONG_MAX when no FTWRL is pending.
335   */
336   uint64 pause_sub_id;
337   /* Total count of event groups queued so far. */
338   uint64 count_queued_event_groups;
339   /*
340     Count of event groups that have started (but not necessarily completed)
341     the commit phase. We use this to know when every event group in a previous
342     batch of master group commits have started committing on the slave, so
343     that it is safe to start executing the events in the following batch.
344   */
345   uint64 count_committing_event_groups;
346   /* The group_commit_orderer object for the events currently being queued. */
347   group_commit_orderer *current_gco;
348 
349   rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
350                                       PSI_stage_info *old_stage, bool reuse);
351   int queue_master_restart(rpl_group_info *rgi,
352                            Format_description_log_event *fdev);
353 };
354 struct rpl_parallel {
355   HASH domain_hash;
356   rpl_parallel_entry *current;
357   bool sql_thread_stopping;
358 
359   rpl_parallel();
360   ~rpl_parallel();
361   void reset();
362   rpl_parallel_entry *find(uint32 domain_id);
363   void wait_for_done(THD *thd, Relay_log_info *rli);
364   void stop_during_until();
365   bool workers_idle();
366   int wait_for_workers_idle(THD *thd);
367   int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size);
368 };
369 
370 
371 extern struct rpl_parallel_thread_pool global_rpl_thread_pool;
372 
373 
374 extern int rpl_parallel_resize_pool_if_no_slaves(void);
375 extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool);
376 extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool);
377 extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid);
378 extern int rpl_pause_for_ftwrl(THD *thd);
379 extern void rpl_unpause_after_ftwrl(THD *thd);
380 
381 #endif  /* RPL_PARALLEL_H */
382