1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #ifndef RPL_RLI_PDB_H
24
25 #define RPL_RLI_PDB_H
26
27 #ifdef HAVE_REPLICATION
28
29 #include "my_global.h"
30 #include "my_bitmap.h" // MY_BITMAP
31 #include "prealloced_array.h" // Prealloced_array
32 #include "log_event.h" // Format_description_log_event
33 #include "rpl_mts_submode.h" // enum_mts_parallel_type
34 #include "rpl_rli.h" // Relay_log_info
35 #include "rpl_slave.h" // MTS_WORKER_UNDEF
36
37 #ifndef NDEBUG
38 extern ulong w_rr;
39 #endif
40 /**
41 Legends running throughout the module:
42
43 C - Coordinator
44 CP - checkpoint
45 W - Worker
46
47 B-event event that Begins a group (a transaction)
48 T-event event that Terminates a group (a transaction)
49 */
50
51 /* Assigned Partition Hash (APH) entry */
52 typedef struct st_db_worker_hash_entry
53 {
54 uint db_len;
55 const char *db;
56 Slave_worker *worker;
57 /*
58 The number of transaction pending on this database.
59 This should only be modified under the lock slave_worker_hash_lock.
60 */
61 long usage;
62 /*
63 The list of temp tables belonging to @ db database is
64 attached to an assigned @c worker to become its thd->temporary_tables.
65 The list is updated with every ddl incl CREATE, DROP.
66 It is removed from the entry and merged to the coordinator's
67 thd->temporary_tables in case of events: slave stops, APH oversize.
68 */
69 TABLE* volatile temporary_tables;
70
71 /* todo: relax concurrency to mimic record-level locking.
72 That is to augmenting the entry with mutex/cond pair
73 pthread_mutex_t
74 pthread_cond_t
75 timestamp updated_at; */
76
77 } db_worker_hash_entry;
78
79 bool init_hash_workers(Relay_log_info *rli);
80 void destroy_hash_workers(Relay_log_info*);
81 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
82 db_worker_hash_entry **ptr_entry,
83 bool need_temp_tables, Slave_worker *w);
84 Slave_worker *get_least_occupied_worker(Relay_log_info *rli,
85 Slave_worker_array *workers,
86 Log_event* ev);
87
88 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
89
90 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
91
92 typedef struct st_slave_job_group
93 {
94 char *group_master_log_name; // (actually redundant)
95 /*
96 T-event lop_pos filled by Worker for CheckPoint (CP)
97 */
98 my_off_t group_master_log_pos;
99
100 /*
101 When relay-log name changes allocates and fill in a new name of relay-log,
102 otherwise it fills in NULL.
103 Coordinator keeps track of each Worker has been notified on the updating
104 to make sure the routine runs once per change.
105
106 W checks the value at commit and memoriezes a not-NULL.
107 Freeing unless NULL is left to Coordinator at CP.
108 */
109 char *group_relay_log_name; // The value is last seen relay-log
110 my_off_t group_relay_log_pos; // filled by W
111 ulong worker_id;
112 Slave_worker *worker;
113 ulonglong total_seqno;
114
115 my_off_t master_log_pos; // B-event log_pos
116 /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
117 uint checkpoint_seqno;
118 my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
119 char* checkpoint_log_name;
120 my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
121 char* checkpoint_relay_log_name;
122 int32 done; // Flag raised by W, read and reset by Coordinator
123 ulong shifted; // shift the last CP bitmap at receiving a new CP
124 time_t ts; // Group's timestampt to update Seconds_behind_master
125 #ifndef NDEBUG
126 bool notified; // to debug group_master_log_name change notification
127 #endif
128 /* Clock-based scheduler requirement: */
129 longlong last_committed; // commit parent timestamp
130 longlong sequence_number; // transaction's logical timestamp
131 /*
132 After Coordinator has seen a new FD event, it sets this member to
133 point to the new event, once per worker. Coordinator does so
134 when it schedules a first group following the FD event to a worker.
135 It checks Slave_worker::fd_change_notified flag to decide whether
136 to do this or not.
137 When the worker executes the group, it replaces its currently
138 active FD by the new FD once it takes on the group first event. It
139 checks this member and resets it after the FD replacement is done.
140
141 The member is kind of lock-free. It's updated by Coordinator and
142 read by Worker without holding any mutex. That's still safe thanks
143 to Slave_worker::jobs_lock that works as synchronizer, Worker
144 can't read any stale info.
145 The member is updated by Coordinator when it decides which Worker
146 an event following a new FD is to be scheduled.
147 After Coordinator has chosen a Worker, it queues the event to it
148 with necessarily taking Slave_worker::jobs_lock. The Worker grabs
149 the mutex lock later at pulling the event from the queue and
150 releases the lock before to read from this member.
151
152 This sequence of actions shows the write operation always precedes
153 the read one, and ensures no stale FD info is passed to the
154 Worker.
155 */
156 Format_description_log_event *new_fd_event;
157 /*
158 Coordinator fills the struct with defaults and options at starting of
159 a group distribution.
160 */
resetst_slave_job_group161 void reset(my_off_t master_pos, ulonglong seqno)
162 {
163 master_log_pos= master_pos;
164 group_master_log_pos= group_relay_log_pos= 0;
165 group_master_log_name= NULL; // todo: remove
166 group_relay_log_name= NULL;
167 worker_id= MTS_WORKER_UNDEF;
168 total_seqno= seqno;
169 checkpoint_log_name= NULL;
170 checkpoint_log_pos= 0;
171 checkpoint_relay_log_name= NULL;
172 checkpoint_relay_log_pos= 0;
173 checkpoint_seqno= (uint) -1;
174 done= 0;
175 ts= 0;
176 #ifndef NDEBUG
177 notified= false;
178 #endif
179 last_committed= SEQ_UNINIT;
180 sequence_number= SEQ_UNINIT;
181 new_fd_event= NULL;
182 }
183 } Slave_job_group;
184
185 /**
186 The class defines a type of queue with a predefined max size that is
187 implemented using the circular memory buffer.
188 That is items of the queue are accessed as indexed elements of
189 the array buffer in a way that when the index value reaches
190 a max value it wraps around to point to the first buffer element.
191 */
192 template<typename Element_type>
193 class circular_buffer_queue
194 {
195 public:
196
197 Prealloced_array<Element_type, 1, true> m_Q;
198 ulong size; // the Size of the queue in terms of element
199 ulong avail; // first Available index to append at (next to tail)
200 ulong entry; // the head index or the entry point to the queue.
201 volatile ulong len; // actual length
202 bool inited_queue;
203
circular_buffer_queue(ulong max)204 circular_buffer_queue(ulong max) :
205 m_Q(PSI_INSTRUMENT_ME),
206 size(max), avail(0), entry(max), len(0), inited_queue(false)
207 {
208 if (!m_Q.reserve(size))
209 inited_queue= true;
210 m_Q.resize(size);
211 }
circular_buffer_queue()212 circular_buffer_queue() : m_Q(PSI_INSTRUMENT_ME), inited_queue(false) {}
~circular_buffer_queue()213 ~circular_buffer_queue ()
214 {
215 }
216
217 /**
218 Content of the being dequeued item is copied to the arg-pointer
219 location.
220
221 @param [out] item A pointer to the being dequeued item.
222 @return the queue's array index that the de-queued item
223 located at, or
224 an error encoded in beyond the index legacy range.
225 */
226 ulong de_queue(Element_type *item);
227 /**
228 Similar to de_queue but extracting happens from the tail side.
229
230 @param [out] item A pointer to the being dequeued item.
231 @return the queue's array index that the de-queued item
232 located at, or an error.
233 */
234 ulong de_tail(Element_type *item);
235
236 /**
237 return the index where the arg item locates
238 or an error encoded as a value in beyond of the legacy range
239 [0, size) (value `size' is excluded).
240 */
241 ulong en_queue(Element_type *item);
242 /**
243 return the value of @c data member of the head of the queue.
244 */
head_queue()245 Element_type* head_queue()
246 {
247 if (empty())
248 return NULL;
249 return &m_Q[entry];
250 }
251
252 bool gt(ulong i, ulong k); // comparision of ordering of two entities
253 /* index is within the valid range */
in(ulong k)254 bool in(ulong k) { return !empty() &&
255 (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
empty()256 bool empty() { return entry == size; }
full()257 bool full() { return avail == size; }
258 };
259
260
261 /**
262 Group Assigned Queue whose first element identifies first gap
263 in committed sequence. The head of the queue is therefore next to
264 the low-water-mark.
265 */
266 class Slave_committed_queue : public circular_buffer_queue<Slave_job_group>
267 {
268 public:
269
270 bool inited;
271
272 /* master's Rot-ev exec */
273 void update_current_binlog(const char *post_rotate);
274
275 /*
276 The last checkpoint time Low-Water-Mark
277 */
278 Slave_job_group lwm;
279
280 /* last time processed indexes for each worker */
281 Prealloced_array<ulonglong, 1> last_done;
282
283 /* the being assigned group index in GAQ */
284 ulong assigned_group_index;
285
Slave_committed_queue(const char * log,ulong max,uint n)286 Slave_committed_queue (const char *log, ulong max, uint n)
287 : circular_buffer_queue<Slave_job_group>(max), inited(false),
288 last_done(key_memory_Slave_job_group_group_relay_log_name)
289 {
290 if (max >= (ulong) -1 || !inited_queue)
291 return;
292 else
293 inited= TRUE;
294
295 last_done.resize(n);
296
297 lwm.group_relay_log_name=
298 (char *) my_malloc(key_memory_Slave_job_group_group_relay_log_name,
299 FN_REFLEN + 1, MYF(0));
300 lwm.group_relay_log_name[0]= 0;
301 lwm.sequence_number= SEQ_UNINIT;
302 }
303
~Slave_committed_queue()304 ~Slave_committed_queue ()
305 {
306 if (inited)
307 {
308 my_free(lwm.group_relay_log_name);
309 free_dynamic_items(); // free possibly left allocated strings in GAQ list
310 }
311 }
312
313 #ifndef NDEBUG
314 bool count_done(Relay_log_info* rli);
315 #endif
316
317 /* Checkpoint routine refreshes the queue */
318 ulong move_queue_head(Slave_worker_array *ws);
319 /* Method is for slave shutdown time cleanup */
320 void free_dynamic_items();
321 /*
322 returns a pointer to Slave_job_group struct instance as indexed by arg
323 in the circular buffer dyn-array
324 */
get_job_group(ulong ind)325 Slave_job_group* get_job_group(ulong ind)
326 {
327 assert(ind < size);
328 return &m_Q[ind];
329 }
330
331 /**
332 Assignes @c assigned_group_index to an index of enqueued item
333 and returns it.
334 */
en_queue(Slave_job_group * item)335 ulong en_queue(Slave_job_group *item)
336 {
337 return assigned_group_index=
338 circular_buffer_queue<Slave_job_group>::en_queue(item);
339 }
340
341 /**
342 Dequeue from head.
343
344 @param [out] item A pointer to the being dequeued item.
345 @return The queue's array index that the de-queued item located at,
346 or an error encoded in beyond the index legacy range.
347 */
de_queue(Slave_job_group * item)348 ulong de_queue(Slave_job_group *item)
349 {
350 return circular_buffer_queue<Slave_job_group>::de_queue(item);
351 }
352
353 /**
354 Similar to de_queue() but removing an item from the tail side.
355
356 @param [out] item A pointer to the being dequeued item.
357 @return the queue's array index that the de-queued item
358 located at, or an error.
359 */
de_tail(Slave_job_group * item)360 ulong de_tail(Slave_job_group *item)
361 {
362 return circular_buffer_queue<Slave_job_group>::de_tail(item);
363 }
364
365 ulong find_lwm(Slave_job_group**, ulong);
366 };
367
368
369 /**
370 @return the index where the arg item has been located
371 or an error.
372 */
373 template <typename Element_type>
en_queue(Element_type * item)374 ulong circular_buffer_queue<Element_type>::en_queue(Element_type *item)
375 {
376 ulong ret;
377 if (avail == size)
378 {
379 assert(avail == m_Q.size());
380 return (ulong) -1;
381 }
382
383 // store
384
385 ret= avail;
386 m_Q[avail]= *item;
387
388 // pre-boundary cond
389 if (entry == size)
390 entry= avail;
391
392 avail= (avail + 1) % size;
393 len++;
394
395 // post-boundary cond
396 if (avail == entry)
397 avail= size;
398
399 assert(avail == entry ||
400 len == (avail >= entry) ?
401 (avail - entry) : (size + avail - entry));
402 assert(avail != entry);
403
404 return ret;
405 }
406
407
408 /**
409 Dequeue from head.
410
411 @param [out] item A pointer to the being dequeued item.
412 @return the queue's array index that the de-queued item
413 located at, or an error as an int outside the legacy
414 [0, size) (value `size' is excluded) range.
415 */
416 template <typename Element_type>
de_queue(Element_type * item)417 ulong circular_buffer_queue<Element_type>::de_queue(Element_type *item)
418 {
419 ulong ret;
420 if (entry == size)
421 {
422 assert(len == 0);
423 return (ulong) -1;
424 }
425
426 ret= entry;
427 *item= m_Q[entry];
428 len--;
429
430 // pre boundary cond
431 if (avail == size)
432 avail= entry;
433 entry= (entry + 1) % size;
434
435 // post boundary cond
436 if (avail == entry)
437 entry= size;
438
439 assert(entry == size ||
440 (len == (avail >= entry)? (avail - entry) :
441 (size + avail - entry)));
442 assert(avail != entry);
443
444 return ret;
445 }
446
447
448 template <typename Element_type>
de_tail(Element_type * item)449 ulong circular_buffer_queue<Element_type>::de_tail(Element_type *item)
450 {
451 if (entry == size)
452 {
453 assert(len == 0);
454 return (ulong) -1;
455 }
456
457 avail= (entry + len - 1) % size;
458 *item= m_Q[avail];
459 len--;
460
461 // post boundary cond
462 if (avail == entry)
463 entry= size;
464
465 assert(entry == size ||
466 (len == (avail >= entry)? (avail - entry) :
467 (size + avail - entry)));
468 assert(avail != entry);
469
470 return avail;
471 }
472
473
474 class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item>
475 {
476 public:
Slave_jobs_queue()477 Slave_jobs_queue() : circular_buffer_queue<Slave_job_item>() {}
478 /*
479 Coordinator marks with true, Worker signals back at queue back to
480 available
481 */
482 bool overfill;
483 ulonglong waited_overfill;
484 };
485
486 class Slave_worker : public Relay_log_info
487 {
488 public:
489 Slave_worker(Relay_log_info *rli
490 #ifdef HAVE_PSI_INTERFACE
491 ,PSI_mutex_key *param_key_info_run_lock,
492 PSI_mutex_key *param_key_info_data_lock,
493 PSI_mutex_key *param_key_info_sleep_lock,
494 PSI_mutex_key *param_key_info_thd_lock,
495 PSI_mutex_key *param_key_info_data_cond,
496 PSI_mutex_key *param_key_info_start_cond,
497 PSI_mutex_key *param_key_info_stop_cond,
498 PSI_mutex_key *param_key_info_sleep_cond
499 #endif
500 , uint param_id, const char *param_channel
501 );
502
503 virtual ~Slave_worker();
504
505 Slave_jobs_queue jobs; // assignment queue containing events to execute
506 mysql_mutex_t jobs_lock; // mutex for the jobs queue
507 mysql_cond_t jobs_cond; // condition variable for the jobs queue
508 Relay_log_info *c_rli; // pointer to Coordinator's rli
509
510 Prealloced_array<db_worker_hash_entry*, SLAVE_INIT_DBS_IN_GROUP>
511 curr_group_exec_parts; // Current Group Executed Partitions
512
513 bool curr_group_seen_begin; // is set to TRUE with explicit B-event
514 #ifndef NDEBUG
515 bool curr_group_seen_sequence_number; // is set to TRUE about starts_group()
516 #endif
517 ulong id; // numberic identifier of the Worker
518
519 /*
520 Worker runtime statictics
521 */
522 // the index in GAQ of the last processed group by this Worker
523 volatile ulong last_group_done_index;
524 ulonglong last_groups_assigned_index; // index of previous group assigned to worker
525 ulong wq_empty_waits; // how many times got idle
526 ulong events_done; // how many events (statements) processed
527 ulong groups_done; // how many groups (transactions) processed
528 volatile int curr_jobs; // number of active assignments
529 // number of partitions allocated to the worker at point in time
530 long usage_partition;
531 // symmetric to rli->mts_end_group_sets_max_dbs
532 bool end_group_sets_max_dbs;
533
534 volatile bool relay_log_change_notified; // Coord sets and resets, W can read
535 volatile bool checkpoint_notified; // Coord sets and resets, W can read
536 volatile bool master_log_change_notified; // Coord sets and resets, W can read
537 /*
538 The variable serves to Coordinator as a memo to itself
539 to notify a Worker about the fact that a new FD has been read.
540 Normally, the value is true, to mean the Worker is notified.
541 When Coordinator reads a new FD it changes the value to false.
542 When Coordinator schedules to a Worker the first event following the new FD,
543 it propagates the new FD to the Worker through Slave_job_group::new_fd_event.
544 Afterwards Coordinator returns the value back to the regular true,
545 to denote things done. Worker will adapt to the new FD once it
546 takes on a first event of the marked group.
547 */
548 bool fd_change_notified;
549 ulong bitmap_shifted; // shift the last bitmap at receiving new CP
550 // WQ current excess above the overrun level
551 long wq_overrun_cnt;
552 /*
553 number of events starting from which Worker queue is regarded as
554 close to full. The number of the excessive events yields a weight factor
555 to compute Coordinator's nap.
556 */
557 ulong overrun_level;
558 /*
559 reverse to overrun: the number of events below which Worker is
560 considered underruning
561 */
562 ulong underrun_level;
563 /*
564 Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
565 When WQ length is dropped below overrun the counter is reset.
566 */
567 ulong excess_cnt;
568 /*
569 Coordinates of the last CheckPoint (CP) this Worker has
570 acknowledged; part of is persisent data
571 */
572 char checkpoint_relay_log_name[FN_REFLEN];
573 ulonglong checkpoint_relay_log_pos;
574 char checkpoint_master_log_name[FN_REFLEN];
575 ulonglong checkpoint_master_log_pos;
576 MY_BITMAP group_executed; // bitmap describes groups executed after last CP
577 MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
578 ulong checkpoint_seqno; // the most significant ON bit in group_executed
579 /* Initial value of FD-for-execution version until it's gets known. */
580 ulong server_version;
581 enum en_running_state
582 {
583 NOT_RUNNING= 0,
584 RUNNING= 1,
585 ERROR_LEAVING= 2, // is set by Worker
586 STOP= 3, // is set by Coordinator upon reciving STOP
587 STOP_ACCEPTED= 4 // is set by worker upon completing job when STOP SLAVE is issued
588 };
589
590 /*
591 This function is used to make a copy of the worker object before we
592 destroy it on STOP SLAVE. This new object is then used to report the
593 worker status until next START SLAVE following which the new worker objetcs
594 will be used.
595 */
596 void copy_values_for_PFS(ulong worker_id, en_running_state running_status,
597 THD *worker_thd, const Error &last_error,
598 const Gtid_specification ¤tly_executing_gtid);
599
600 /*
601 The running status is guarded by jobs_lock mutex that a writer
602 Coordinator or Worker itself needs to hold when write a new value.
603 */
604 en_running_state volatile running_status;
605 /*
606 exit_incremented indicates whether worker has contributed to max updated index.
607 By default it is set to false. When the worker contibutes for the first time this
608 variable is set to true.
609 */
610 bool exit_incremented;
611
612 int init_worker(Relay_log_info*, ulong);
613 int rli_init_info(bool);
614 int flush_info(bool force= FALSE);
615 static size_t get_number_worker_fields();
616 void slave_worker_ends_group(Log_event*, int);
617 const char *get_master_log_name();
get_master_log_pos()618 ulonglong get_master_log_pos() { return master_log_pos; };
set_master_log_pos(ulong val)619 ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
620 bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
621 /*
622 When commit fails clear bitmap for executed worker group. Revert back the
623 positions to the old positions that existed before commit using the checkpoint.
624
625 @param Slave_job_group a pointer to Slave_job_group struct instance which
626 holds group master log pos, group relay log pos and checkpoint positions.
627 */
628 void rollback_positions(Slave_job_group *ptr_g);
629 bool reset_recovery_info();
630 /**
631 The method runs at Worker initalization, at runtime when
632 Coordinator supplied a new FD event for execution context, and at
633 the Worker pool shutdown.
634 Similarly to the Coordinator's
635 Relay_log_info::set_rli_description_event() the possibly existing
636 old FD is destoyed, carefully; each worker decrements
637 Format_description_log_event::usage_counter and when it is made
638 zero the destructor runs.
639 Unlike to Coordinator's role, the usage counter of the new FD is *not*
640 incremented, see @c Log_event::get_slave_worker() where and why it's done
641 there.
642
643 Notice, the method is run as well by Coordinator per each Worker at MTS
644 shutdown time.
645
646 Todo: consider to merge logics of the method with that of
647 Relay_log_info class.
648
649 @param fdle pointer to a new Format_description_log_event
650 */
set_rli_description_event(Format_description_log_event * fdle)651 void set_rli_description_event(Format_description_log_event *fdle)
652 {
653 DBUG_ENTER("Slave_worker::set_rli_description_event");
654
655 if (fdle)
656 {
657 /*
658 When the master rotates its binary log, set gtid_next to
659 NOT_YET_DETERMINED. This tells the slave thread that:
660
661 - If a Gtid_log_event is read subsequently, gtid_next will be
662 set to the given GTID (this is done in
663 gtid_pre_statement_checks()).
664
665 - If a statement is executed before any Gtid_log_event, then
666 gtid_next is set to anonymous (this is done in
667 Gtid_log_event::do_apply_event().
668
669 It is imporant to not set GTID_NEXT=NOT_YET_DETERMINED in the
670 middle of a transaction. If that would happen when
671 GTID_MODE=ON, the next statement would fail because it
672 implicitly sets GTID_NEXT=ANONYMOUS, which is disallowed when
673 GTID_MODE=ON. So then there would be no way to end the
674 transaction; any attempt to do so would result in this error.
675 (It is not possible for the slave threads to have
676 gtid_next.type==AUTOMATIC or UNDEFINED in the middle of a
677 transaction, but it is possible for a client thread to have
678 gtid_next.type==AUTOMATIC and issue a BINLOG statement
679 containing this Format_description_log_event.)
680 */
681 if (!is_in_group() &&
682 (info_thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
683 info_thd->variables.gtid_next.type == UNDEFINED_GROUP))
684 {
685 DBUG_PRINT("info", ("Setting gtid_next.type to NOT_YET_DETERMINED_GROUP"));
686 info_thd->variables.gtid_next.set_not_yet_determined();
687 }
688 adapt_to_master_version_updown(fdle->get_product_version(),
689 get_master_server_version());
690 }
691 if (rli_description_event)
692 {
693 assert(rli_description_event->usage_counter.atomic_get() > 0);
694
695 if (rli_description_event->usage_counter.atomic_add(-1) == 1)
696 {
697 /* The being deleted by Worker FD can't be the latest one */
698 assert(rli_description_event != c_rli->get_rli_description_event());
699
700 delete rli_description_event;
701 }
702 }
703 rli_description_event= fdle;
704
705 DBUG_VOID_RETURN;
706 }
707
reset_gaq_index()708 inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
set_gaq_index(ulong val)709 inline void set_gaq_index(ulong val)
710 {
711 if (gaq_index == c_rli->gaq->size)
712 gaq_index= val;
713 };
714
715 int slave_worker_exec_event(Log_event *ev);
716 /**
717 Make the necessary changes to both the `Slave_worker` and current
718 `Log_event` objects, before retrying to apply the transaction.
719
720 Since the event is going to be re-read from the relay-log file, there
721 may be actions needed to be taken to reset the state both of `this`
722 instance, as well as of the current `Log_event` being processed.
723
724 @param event The `Log_event` object currently being processed.
725 */
726 void prepare_for_retry(Log_event &event);
727 bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos,
728 uint end_relay_number, my_off_t end_relay_pos);
729
730 bool set_info_search_keys(Rpl_info_handler *to);
731
732
733 /**
734 Get coordinator's RLI. Especially used get the rli from
735 a slave thread, like this: thd->rli_slave->get_c_rli();
736 thd could be a SQL thread or a worker thread.
737 */
get_c_rli()738 virtual Relay_log_info* get_c_rli()
739 {
740 return c_rli;
741 }
742
743 /**
744 return an extension "for channel channel_name"
745 for error messages per channel
746 */
747 const char* get_for_channel_str(bool upper_case= false) const;
748
sequence_number()749 longlong sequence_number()
750 {
751 Slave_job_group* ptr_g= c_rli->gaq->get_job_group(gaq_index);
752 return ptr_g->sequence_number;
753 }
754
found_order_commit_deadlock()755 bool found_order_commit_deadlock() { return m_order_commit_deadlock; }
report_order_commit_deadlock()756 void report_order_commit_deadlock() { m_order_commit_deadlock= true; }
757 /**
758 @return either the master server version as extracted from the last
759 installed Format_description_log_event, or when it was not
760 installed then the slave own server version.
761 */
get_master_server_version()762 ulong get_master_server_version()
763 {
764 return !get_rli_description_event() ? server_version :
765 get_rli_description_event()->get_product_version();
766 }
767
768 // overridden new and delete operators for 64 byte alignment
769 static void* operator new(size_t request);
770 static void operator delete(void * ptr);
771
772 protected:
773
774 virtual void do_report(loglevel level, int err_code,
775 const char *msg, va_list v_args) const;
776
777 private:
778 ulong gaq_index; // GAQ index of the current assignment
779 ulonglong master_log_pos; // event's cached log_pos for possibile error report
780 void end_info();
781 bool read_info(Rpl_info_handler *from);
782 bool write_info(Rpl_info_handler *to);
783 bool m_order_commit_deadlock;
784
785 Slave_worker& operator=(const Slave_worker& info);
786 Slave_worker(const Slave_worker& info);
787 bool worker_sleep(ulong seconds);
788 bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos,
789 uint end_relay_number, my_off_t end_relay_pos);
790 void assign_partition_db(Log_event *ev);
791
reset_order_commit_deadlock()792 void reset_order_commit_deadlock() { m_order_commit_deadlock= false; }
793 public:
794 /**
795 Returns an array with the expected column numbers of the primary key
796 fields of the table repository.
797 */
798 static const uint *get_table_pk_field_indexes();
799 /**
800 Returns the index of the Channel_name field of the table repository.
801 */
802 static uint get_channel_field_index();
803
804 /**
805 This class aims to do cleanup for workers in retry_transaction method.
806 */
807 class Retry_context_sentry {
808 public:
809 /**
810 Constructor to inilizate class objects and flags.
811 */
812 Retry_context_sentry(Slave_worker& parent);
813 /**
814 This destructor calls clean() method which performs the cleanup.
815 */
816 virtual ~Retry_context_sentry();
817 /**
818 Operator to set the value of m_cleaned_up.
819
820 @param [out] Flag to check for cleanup.
821 @return the value of flag for each worker.
822
823 */
824 Retry_context_sentry& operator=(bool is_cleaned_up);
825 /**
826 This method performs the cleanup and resets m_order_commit_deadlock flag.
827 */
828 void clean();
829 private:
830 Slave_worker& m_parent; // Object of enclosed class.
831 bool m_is_cleaned_up; // Flag to check for cleanup.
832 };
833 };
834
835 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
836 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item);
837 bool set_max_updated_index_on_stop(Slave_worker *worker,
838 Slave_job_item *job_item);
839
840 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
841 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
842 // Auxiliary function
843 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*, enum_mts_parallel_type);
844
845 bool append_item_to_jobs(slave_job_item *job_item,
846 Slave_worker *w, Relay_log_info *rli);
847 Slave_job_item* de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
848
get_thd_worker(THD * thd)849 inline Slave_worker* get_thd_worker(THD *thd)
850 {
851 return static_cast<Slave_worker *>(thd->rli_slave);
852 }
853
854 #endif // HAVE_REPLICATION
855 #endif
856