1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #ifndef RPL_RLI_PDB_H
24 
25 #define RPL_RLI_PDB_H
26 
27 #ifdef HAVE_REPLICATION
28 
29 #include "my_global.h"
30 #include "my_bitmap.h"         // MY_BITMAP
31 #include "prealloced_array.h"  // Prealloced_array
32 #include "log_event.h"         // Format_description_log_event
33 #include "rpl_mts_submode.h"   // enum_mts_parallel_type
34 #include "rpl_rli.h"           // Relay_log_info
35 #include "rpl_slave.h"         // MTS_WORKER_UNDEF
36 
37 #ifndef NDEBUG
38 extern ulong w_rr;
39 #endif
40 /**
41   Legends running throughout the module:
42 
43   C  - Coordinator
44   CP - checkpoint
45   W  - Worker
46 
47   B-event event that Begins a group (a transaction)
48   T-event event that Terminates a group (a transaction)
49 */
50 
51 /* Assigned Partition Hash (APH) entry */
52 typedef struct st_db_worker_hash_entry
53 {
54   uint  db_len;
55   const char *db;
56   Slave_worker *worker;
57   /*
58     The number of transaction pending on this database.
59     This should only be modified under the lock slave_worker_hash_lock.
60    */
61   long usage;
62   /*
63     The list of temp tables belonging to @ db database is
64     attached to an assigned @c worker to become its thd->temporary_tables.
65     The list is updated with every ddl incl CREATE, DROP.
66     It is removed from the entry and merged to the coordinator's
67     thd->temporary_tables in case of events: slave stops, APH oversize.
68   */
69   TABLE* volatile temporary_tables;
70 
71   /* todo: relax concurrency to mimic record-level locking.
72      That is to augmenting the entry with mutex/cond pair
73      pthread_mutex_t
74      pthread_cond_t
75      timestamp updated_at; */
76 
77 } db_worker_hash_entry;
78 
79 bool init_hash_workers(Relay_log_info *rli);
80 void destroy_hash_workers(Relay_log_info*);
81 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
82                                db_worker_hash_entry **ptr_entry,
83                                bool need_temp_tables, Slave_worker *w);
84 Slave_worker *get_least_occupied_worker(Relay_log_info *rli,
85                                         Slave_worker_array *workers,
86                                         Log_event* ev);
87 
88 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
89 
90 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
91 
92 typedef struct st_slave_job_group
93 {
94   char *group_master_log_name;   // (actually redundant)
95   /*
96     T-event lop_pos filled by Worker for CheckPoint (CP)
97   */
98   my_off_t group_master_log_pos;
99 
100   /*
101      When relay-log name changes  allocates and fill in a new name of relay-log,
102      otherwise it fills in NULL.
103      Coordinator keeps track of each Worker has been notified on the updating
104      to make sure the routine runs once per change.
105 
106      W checks the value at commit and memoriezes a not-NULL.
107      Freeing unless NULL is left to Coordinator at CP.
108   */
109   char     *group_relay_log_name; // The value is last seen relay-log
110   my_off_t group_relay_log_pos;  // filled by W
111   ulong worker_id;
112   Slave_worker *worker;
113   ulonglong total_seqno;
114 
115   my_off_t master_log_pos;       // B-event log_pos
116   /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
117   uint  checkpoint_seqno;
118   my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
119   char*    checkpoint_log_name;
120   my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
121   char*    checkpoint_relay_log_name;
122   int32    done;  // Flag raised by W,  read and reset by Coordinator
123   ulong    shifted;     // shift the last CP bitmap at receiving a new CP
124   time_t   ts;          // Group's timestampt to update Seconds_behind_master
125 #ifndef NDEBUG
126   bool     notified;    // to debug group_master_log_name change notification
127 #endif
128   /* Clock-based scheduler requirement: */
129   longlong last_committed; // commit parent timestamp
130   longlong sequence_number;   // transaction's logical timestamp
131   /*
132     After Coordinator has seen a new FD event, it sets this member to
133     point to the new event, once per worker. Coordinator does so
134     when it schedules a first group following the FD event to a worker.
135     It checks Slave_worker::fd_change_notified flag to decide whether
136     to do this or not.
137     When the worker executes the group, it replaces its currently
138     active FD by the new FD once it takes on the group first event. It
139     checks this member and resets it after the FD replacement is done.
140 
141     The member is kind of lock-free. It's updated by Coordinator and
142     read by Worker without holding any mutex. That's still safe thanks
143     to Slave_worker::jobs_lock that works as synchronizer, Worker
144     can't read any stale info.
145     The member is updated by Coordinator when it decides which Worker
146     an event following a new FD is to be scheduled.
147     After Coordinator has chosen a Worker, it queues the event to it
148     with necessarily taking Slave_worker::jobs_lock. The Worker grabs
149     the mutex lock later at pulling the event from the queue and
150     releases the lock before to read from this member.
151 
152     This sequence of actions shows the write operation always precedes
153     the read one, and ensures no stale FD info is passed to the
154     Worker.
155   */
156   Format_description_log_event *new_fd_event;
157   /*
158     Coordinator fills the struct with defaults and options at starting of
159     a group distribution.
160   */
resetst_slave_job_group161   void reset(my_off_t master_pos, ulonglong seqno)
162   {
163     master_log_pos= master_pos;
164     group_master_log_pos= group_relay_log_pos= 0;
165     group_master_log_name= NULL; // todo: remove
166     group_relay_log_name= NULL;
167     worker_id= MTS_WORKER_UNDEF;
168     total_seqno= seqno;
169     checkpoint_log_name= NULL;
170     checkpoint_log_pos= 0;
171     checkpoint_relay_log_name= NULL;
172     checkpoint_relay_log_pos= 0;
173     checkpoint_seqno= (uint) -1;
174     done= 0;
175     ts= 0;
176 #ifndef NDEBUG
177     notified= false;
178 #endif
179     last_committed= SEQ_UNINIT;
180     sequence_number= SEQ_UNINIT;
181     new_fd_event= NULL;
182   }
183 } Slave_job_group;
184 
185 /**
186    The class defines a type of queue with a predefined max size that is
187    implemented using the circular memory buffer.
188    That is items of the queue are accessed as indexed elements of
189    the array buffer in a way that when the index value reaches
190    a max value it wraps around to point to the first buffer element.
191 */
192 template<typename Element_type>
193 class circular_buffer_queue
194 {
195 public:
196 
197   Prealloced_array<Element_type, 1, true> m_Q;
198   ulong size;           // the Size of the queue in terms of element
199   ulong avail;          // first Available index to append at (next to tail)
200   ulong entry;          // the head index or the entry point to the queue.
201   volatile ulong len;   // actual length
202   bool inited_queue;
203 
circular_buffer_queue(ulong max)204   circular_buffer_queue(ulong max) :
205     m_Q(PSI_INSTRUMENT_ME),
206     size(max), avail(0), entry(max), len(0), inited_queue(false)
207   {
208     if (!m_Q.reserve(size))
209       inited_queue= true;
210     m_Q.resize(size);
211   }
circular_buffer_queue()212   circular_buffer_queue() : m_Q(PSI_INSTRUMENT_ME), inited_queue(false) {}
~circular_buffer_queue()213   ~circular_buffer_queue ()
214   {
215   }
216 
217   /**
218      Content of the being dequeued item is copied to the arg-pointer
219      location.
220 
221      @param [out] item A pointer to the being dequeued item.
222      @return the queue's array index that the de-queued item
223      located at, or
224      an error encoded in beyond the index legacy range.
225   */
226   ulong de_queue(Element_type *item);
227   /**
228     Similar to de_queue but extracting happens from the tail side.
229 
230     @param [out] item A pointer to the being dequeued item.
231     @return the queue's array index that the de-queued item
232            located at, or an error.
233   */
234   ulong de_tail(Element_type *item);
235 
236   /**
237     return the index where the arg item locates
238            or an error encoded as a value in beyond of the legacy range
239            [0, size) (value `size' is excluded).
240   */
241   ulong en_queue(Element_type *item);
242   /**
243      return the value of @c data member of the head of the queue.
244   */
head_queue()245   Element_type* head_queue()
246   {
247     if (empty())
248       return NULL;
249     return &m_Q[entry];
250   }
251 
252   bool   gt(ulong i, ulong k); // comparision of ordering of two entities
253   /* index is within the valid range */
in(ulong k)254   bool in(ulong k) { return !empty() &&
255       (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
empty()256   bool empty() { return entry == size; }
full()257   bool full() { return avail == size; }
258 };
259 
260 
261 /**
262   Group Assigned Queue whose first element identifies first gap
263   in committed sequence. The head of the queue is therefore next to
264   the low-water-mark.
265 */
266 class Slave_committed_queue : public circular_buffer_queue<Slave_job_group>
267 {
268 public:
269 
270   bool inited;
271 
272   /* master's Rot-ev exec */
273   void update_current_binlog(const char *post_rotate);
274 
275   /*
276      The last checkpoint time Low-Water-Mark
277   */
278   Slave_job_group lwm;
279 
280   /* last time processed indexes for each worker */
281   Prealloced_array<ulonglong, 1> last_done;
282 
283   /* the being assigned group index in GAQ */
284   ulong assigned_group_index;
285 
Slave_committed_queue(const char * log,ulong max,uint n)286   Slave_committed_queue (const char *log, ulong max, uint n)
287     : circular_buffer_queue<Slave_job_group>(max), inited(false),
288       last_done(key_memory_Slave_job_group_group_relay_log_name)
289   {
290     if (max >= (ulong) -1 || !inited_queue)
291       return;
292     else
293       inited= TRUE;
294 
295     last_done.resize(n);
296 
297     lwm.group_relay_log_name=
298       (char *) my_malloc(key_memory_Slave_job_group_group_relay_log_name,
299                          FN_REFLEN + 1, MYF(0));
300     lwm.group_relay_log_name[0]= 0;
301     lwm.sequence_number= SEQ_UNINIT;
302   }
303 
~Slave_committed_queue()304   ~Slave_committed_queue ()
305   {
306     if (inited)
307     {
308       my_free(lwm.group_relay_log_name);
309       free_dynamic_items();  // free possibly left allocated strings in GAQ list
310     }
311   }
312 
313 #ifndef NDEBUG
314   bool count_done(Relay_log_info* rli);
315 #endif
316 
317   /* Checkpoint routine refreshes the queue */
318   ulong move_queue_head(Slave_worker_array *ws);
319   /* Method is for slave shutdown time cleanup */
320   void free_dynamic_items();
321   /*
322      returns a pointer to Slave_job_group struct instance as indexed by arg
323      in the circular buffer dyn-array
324   */
get_job_group(ulong ind)325   Slave_job_group* get_job_group(ulong ind)
326   {
327     assert(ind < size);
328     return &m_Q[ind];
329   }
330 
331   /**
332      Assignes @c assigned_group_index to an index of enqueued item
333      and returns it.
334   */
en_queue(Slave_job_group * item)335   ulong en_queue(Slave_job_group *item)
336   {
337     return assigned_group_index=
338       circular_buffer_queue<Slave_job_group>::en_queue(item);
339   }
340 
341   /**
342     Dequeue from head.
343 
344     @param [out] item A pointer to the being dequeued item.
345     @return The queue's array index that the de-queued item located at,
346             or an error encoded in beyond the index legacy range.
347   */
de_queue(Slave_job_group * item)348   ulong de_queue(Slave_job_group *item)
349   {
350     return circular_buffer_queue<Slave_job_group>::de_queue(item);
351   }
352 
353   /**
354     Similar to de_queue() but removing an item from the tail side.
355 
356     @param [out] item A pointer to the being dequeued item.
357     @return the queue's array index that the de-queued item
358            located at, or an error.
359   */
de_tail(Slave_job_group * item)360   ulong de_tail(Slave_job_group *item)
361   {
362     return circular_buffer_queue<Slave_job_group>::de_tail(item);
363   }
364 
365   ulong find_lwm(Slave_job_group**, ulong);
366 };
367 
368 
369 /**
370     @return  the index where the arg item has been located
371              or an error.
372 */
373 template <typename Element_type>
en_queue(Element_type * item)374 ulong circular_buffer_queue<Element_type>::en_queue(Element_type *item)
375 {
376   ulong ret;
377   if (avail == size)
378   {
379     assert(avail == m_Q.size());
380     return (ulong) -1;
381   }
382 
383   // store
384 
385   ret= avail;
386   m_Q[avail]= *item;
387 
388   // pre-boundary cond
389   if (entry == size)
390     entry= avail;
391 
392   avail= (avail + 1) % size;
393   len++;
394 
395   // post-boundary cond
396   if (avail == entry)
397     avail= size;
398 
399   assert(avail == entry ||
400          len == (avail >= entry) ?
401          (avail - entry) : (size + avail - entry));
402   assert(avail != entry);
403 
404   return ret;
405 }
406 
407 
408 /**
409   Dequeue from head.
410 
411   @param [out] item A pointer to the being dequeued item.
412   @return the queue's array index that the de-queued item
413           located at, or an error as an int outside the legacy
414           [0, size) (value `size' is excluded) range.
415 */
416 template <typename Element_type>
de_queue(Element_type * item)417 ulong circular_buffer_queue<Element_type>::de_queue(Element_type *item)
418 {
419   ulong ret;
420   if (entry == size)
421   {
422     assert(len == 0);
423     return (ulong) -1;
424   }
425 
426   ret= entry;
427   *item= m_Q[entry];
428   len--;
429 
430   // pre boundary cond
431   if (avail == size)
432     avail= entry;
433   entry= (entry + 1) % size;
434 
435   // post boundary cond
436   if (avail == entry)
437     entry= size;
438 
439   assert(entry == size ||
440          (len == (avail >= entry)? (avail - entry) :
441           (size + avail - entry)));
442   assert(avail != entry);
443 
444   return ret;
445 }
446 
447 
448 template <typename Element_type>
de_tail(Element_type * item)449 ulong circular_buffer_queue<Element_type>::de_tail(Element_type *item)
450 {
451   if (entry == size)
452   {
453     assert(len == 0);
454     return (ulong) -1;
455   }
456 
457   avail= (entry + len - 1) % size;
458   *item= m_Q[avail];
459   len--;
460 
461   // post boundary cond
462   if (avail == entry)
463     entry= size;
464 
465   assert(entry == size ||
466          (len == (avail >= entry)? (avail - entry) :
467           (size + avail - entry)));
468   assert(avail != entry);
469 
470   return avail;
471 }
472 
473 
474 class Slave_jobs_queue : public circular_buffer_queue<Slave_job_item>
475 {
476 public:
Slave_jobs_queue()477   Slave_jobs_queue() : circular_buffer_queue<Slave_job_item>() {}
478   /*
479      Coordinator marks with true, Worker signals back at queue back to
480      available
481   */
482   bool overfill;
483   ulonglong waited_overfill;
484 };
485 
486 class Slave_worker : public Relay_log_info
487 {
488 public:
489   Slave_worker(Relay_log_info *rli
490 #ifdef HAVE_PSI_INTERFACE
491                ,PSI_mutex_key *param_key_info_run_lock,
492                PSI_mutex_key *param_key_info_data_lock,
493                PSI_mutex_key *param_key_info_sleep_lock,
494                PSI_mutex_key *param_key_info_thd_lock,
495                PSI_mutex_key *param_key_info_data_cond,
496                PSI_mutex_key *param_key_info_start_cond,
497                PSI_mutex_key *param_key_info_stop_cond,
498                PSI_mutex_key *param_key_info_sleep_cond
499 #endif
500                , uint param_id, const char *param_channel
501               );
502 
503   virtual ~Slave_worker();
504 
505   Slave_jobs_queue jobs;   // assignment queue containing events to execute
506   mysql_mutex_t jobs_lock; // mutex for the jobs queue
507   mysql_cond_t  jobs_cond; // condition variable for the jobs queue
508   Relay_log_info *c_rli;   // pointer to Coordinator's rli
509 
510   Prealloced_array<db_worker_hash_entry*, SLAVE_INIT_DBS_IN_GROUP>
511   curr_group_exec_parts; // Current Group Executed Partitions
512 
513   bool curr_group_seen_begin; // is set to TRUE with explicit B-event
514 #ifndef NDEBUG
515   bool curr_group_seen_sequence_number; // is set to TRUE about starts_group()
516 #endif
517   ulong id;                 // numberic identifier of the Worker
518 
519   /*
520     Worker runtime statictics
521   */
522   // the index in GAQ of the last processed group by this Worker
523   volatile ulong last_group_done_index;
524   ulonglong last_groups_assigned_index; // index of previous group assigned to worker
525   ulong wq_empty_waits;  // how many times got idle
526   ulong events_done;     // how many events (statements) processed
527   ulong groups_done;     // how many groups (transactions) processed
528   volatile int curr_jobs; // number of active  assignments
529   // number of partitions allocated to the worker at point in time
530   long usage_partition;
531   // symmetric to rli->mts_end_group_sets_max_dbs
532   bool end_group_sets_max_dbs;
533 
534   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
535   volatile bool checkpoint_notified; // Coord sets and resets, W can read
536   volatile bool master_log_change_notified; // Coord sets and resets, W can read
537   /*
538     The variable serves to Coordinator as a memo to itself
539     to notify a Worker about the fact that a new FD has been read.
540     Normally, the value is true, to mean the Worker is notified.
541     When Coordinator reads a new FD it changes the value to false.
542     When Coordinator schedules to a Worker the first event following the new FD,
543     it propagates the new FD to the Worker through Slave_job_group::new_fd_event.
544     Afterwards Coordinator returns the value back to the regular true,
545     to denote things done. Worker will adapt to the new FD once it
546     takes on a first event of the marked group.
547   */
548   bool fd_change_notified;
549   ulong bitmap_shifted;  // shift the last bitmap at receiving new CP
550   // WQ current excess above the overrun level
551   long wq_overrun_cnt;
552   /*
553     number of events starting from which Worker queue is regarded as
554     close to full. The number of the excessive events yields a weight factor
555     to compute Coordinator's nap.
556   */
557   ulong overrun_level;
558   /*
559      reverse to overrun: the number of events below which Worker is
560      considered underruning
561   */
562   ulong underrun_level;
563   /*
564     Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
565     When WQ length is dropped below overrun the counter is reset.
566   */
567   ulong excess_cnt;
568   /*
569     Coordinates of the last CheckPoint (CP) this Worker has
570     acknowledged; part of is persisent data
571   */
572   char checkpoint_relay_log_name[FN_REFLEN];
573   ulonglong checkpoint_relay_log_pos;
574   char checkpoint_master_log_name[FN_REFLEN];
575   ulonglong checkpoint_master_log_pos;
576   MY_BITMAP group_executed; // bitmap describes groups executed after last CP
577   MY_BITMAP group_shifted;  // temporary bitmap to compute group_executed
578   ulong checkpoint_seqno;   // the most significant ON bit in group_executed
579   /* Initial value of FD-for-execution version until it's gets known. */
580   ulong server_version;
581   enum en_running_state
582   {
583     NOT_RUNNING= 0,
584     RUNNING= 1,
585     ERROR_LEAVING= 2,         // is set by Worker
586     STOP= 3,                  // is set by Coordinator upon reciving STOP
587     STOP_ACCEPTED= 4          // is set by worker upon completing job when STOP SLAVE is issued
588   };
589 
590   /*
591     This function is used to make a copy of the worker object before we
592     destroy it on STOP SLAVE. This new object is then used to report the
593     worker status until next START SLAVE following which the new worker objetcs
594     will be used.
595   */
596   void copy_values_for_PFS(ulong worker_id, en_running_state running_status,
597                            THD *worker_thd, const Error &last_error,
598                            const Gtid_specification &currently_executing_gtid);
599 
600   /*
601     The running status is guarded by jobs_lock mutex that a writer
602     Coordinator or Worker itself needs to hold when write a new value.
603   */
604   en_running_state volatile running_status;
605   /*
606     exit_incremented indicates whether worker has contributed to max updated index.
607     By default it is set to false. When the worker contibutes for the first time this
608     variable is set to true.
609   */
610   bool exit_incremented;
611 
612   int init_worker(Relay_log_info*, ulong);
613   int rli_init_info(bool);
614   int flush_info(bool force= FALSE);
615   static size_t get_number_worker_fields();
616   void slave_worker_ends_group(Log_event*, int);
617   const char *get_master_log_name();
get_master_log_pos()618   ulonglong get_master_log_pos() { return master_log_pos; };
set_master_log_pos(ulong val)619   ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
620   bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
621   /*
622     When commit fails clear bitmap for executed worker group. Revert back the
623     positions to the old positions that existed before commit using the checkpoint.
624 
625     @param Slave_job_group a pointer to Slave_job_group struct instance which
626     holds group master log pos, group relay log pos and checkpoint positions.
627   */
628   void rollback_positions(Slave_job_group *ptr_g);
629   bool reset_recovery_info();
630   /**
631     The method runs at Worker initalization, at runtime when
632     Coordinator supplied a new FD event for execution context, and at
633     the Worker pool shutdown.
634     Similarly to the Coordinator's
635     Relay_log_info::set_rli_description_event() the possibly existing
636     old FD is destoyed, carefully; each worker decrements
637     Format_description_log_event::usage_counter and when it is made
638     zero the destructor runs.
639     Unlike to Coordinator's role, the usage counter of the new FD is *not*
640     incremented, see @c Log_event::get_slave_worker() where and why it's done
641     there.
642 
643     Notice, the method is run as well by Coordinator per each Worker at MTS
644     shutdown time.
645 
646     Todo: consider to merge logics of the method with that of
647     Relay_log_info class.
648 
649     @param fdle   pointer to a new Format_description_log_event
650   */
set_rli_description_event(Format_description_log_event * fdle)651   void set_rli_description_event(Format_description_log_event *fdle)
652   {
653     DBUG_ENTER("Slave_worker::set_rli_description_event");
654 
655     if (fdle)
656     {
657       /*
658         When the master rotates its binary log, set gtid_next to
659         NOT_YET_DETERMINED.  This tells the slave thread that:
660 
661         - If a Gtid_log_event is read subsequently, gtid_next will be
662           set to the given GTID (this is done in
663           gtid_pre_statement_checks()).
664 
665         - If a statement is executed before any Gtid_log_event, then
666           gtid_next is set to anonymous (this is done in
667           Gtid_log_event::do_apply_event().
668 
669         It is imporant to not set GTID_NEXT=NOT_YET_DETERMINED in the
670         middle of a transaction.  If that would happen when
671         GTID_MODE=ON, the next statement would fail because it
672         implicitly sets GTID_NEXT=ANONYMOUS, which is disallowed when
673         GTID_MODE=ON.  So then there would be no way to end the
674         transaction; any attempt to do so would result in this error.
675         (It is not possible for the slave threads to have
676         gtid_next.type==AUTOMATIC or UNDEFINED in the middle of a
677         transaction, but it is possible for a client thread to have
678         gtid_next.type==AUTOMATIC and issue a BINLOG statement
679         containing this Format_description_log_event.)
680       */
681       if (!is_in_group() &&
682           (info_thd->variables.gtid_next.type == AUTOMATIC_GROUP ||
683            info_thd->variables.gtid_next.type == UNDEFINED_GROUP))
684       {
685         DBUG_PRINT("info", ("Setting gtid_next.type to NOT_YET_DETERMINED_GROUP"));
686         info_thd->variables.gtid_next.set_not_yet_determined();
687       }
688       adapt_to_master_version_updown(fdle->get_product_version(),
689                                      get_master_server_version());
690     }
691     if (rli_description_event)
692     {
693       assert(rli_description_event->usage_counter.atomic_get() > 0);
694 
695       if (rli_description_event->usage_counter.atomic_add(-1) == 1)
696       {
697         /* The being deleted by Worker FD can't be the latest one */
698         assert(rli_description_event != c_rli->get_rli_description_event());
699 
700         delete rli_description_event;
701       }
702     }
703     rli_description_event= fdle;
704 
705     DBUG_VOID_RETURN;
706   }
707 
reset_gaq_index()708   inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
set_gaq_index(ulong val)709   inline void set_gaq_index(ulong val)
710   {
711     if (gaq_index == c_rli->gaq->size)
712       gaq_index= val;
713   };
714 
715   int slave_worker_exec_event(Log_event *ev);
716   /**
717     Make the necessary changes to both the `Slave_worker` and current
718     `Log_event` objects, before retrying to apply the transaction.
719 
720     Since the event is going to be re-read from the relay-log file, there
721     may be actions needed to be taken to reset the state both of `this`
722     instance, as well as of the current `Log_event` being processed.
723 
724     @param event The `Log_event` object currently being processed.
725   */
726   void prepare_for_retry(Log_event &event);
727   bool retry_transaction(uint start_relay_number, my_off_t start_relay_pos,
728                          uint end_relay_number, my_off_t end_relay_pos);
729 
730   bool set_info_search_keys(Rpl_info_handler *to);
731 
732 
733   /**
734     Get coordinator's RLI. Especially used get the rli from
735     a slave thread, like this: thd->rli_slave->get_c_rli();
736     thd could be a SQL thread or a worker thread.
737   */
get_c_rli()738    virtual Relay_log_info* get_c_rli()
739   {
740     return c_rli;
741   }
742 
743   /**
744      return an extension "for channel channel_name"
745      for error messages per channel
746   */
747   const char* get_for_channel_str(bool upper_case= false) const;
748 
sequence_number()749   longlong sequence_number()
750   {
751     Slave_job_group* ptr_g= c_rli->gaq->get_job_group(gaq_index);
752     return ptr_g->sequence_number;
753   }
754 
found_order_commit_deadlock()755   bool found_order_commit_deadlock() { return m_order_commit_deadlock; }
report_order_commit_deadlock()756   void report_order_commit_deadlock() { m_order_commit_deadlock= true; }
757   /**
758     @return either the master server version as extracted from the last
759             installed Format_description_log_event, or when it was not
760             installed then the slave own server version.
761   */
get_master_server_version()762   ulong get_master_server_version()
763   {
764     return !get_rli_description_event() ? server_version :
765       get_rli_description_event()->get_product_version();
766   }
767 
768   // overridden new and delete operators for 64 byte alignment
769   static void* operator new(size_t request);
770   static void operator delete(void * ptr);
771 
772 protected:
773 
774   virtual void do_report(loglevel level, int err_code,
775                          const char *msg, va_list v_args) const;
776 
777 private:
778   ulong gaq_index;          // GAQ index of the current assignment
779   ulonglong master_log_pos; // event's cached log_pos for possibile error report
780   void end_info();
781   bool read_info(Rpl_info_handler *from);
782   bool write_info(Rpl_info_handler *to);
783   bool m_order_commit_deadlock;
784 
785   Slave_worker& operator=(const Slave_worker& info);
786   Slave_worker(const Slave_worker& info);
787   bool worker_sleep(ulong seconds);
788   bool read_and_apply_events(uint start_relay_number, my_off_t start_relay_pos,
789                              uint end_relay_number, my_off_t end_relay_pos);
790   void assign_partition_db(Log_event *ev);
791 
reset_order_commit_deadlock()792   void reset_order_commit_deadlock() { m_order_commit_deadlock= false; }
793 public:
794   /**
795      Returns an array with the expected column numbers of the primary key
796      fields of the table repository.
797   */
798   static const uint *get_table_pk_field_indexes();
799   /**
800      Returns the index of the Channel_name field of the table repository.
801   */
802   static uint get_channel_field_index();
803 
804   /**
805     This class aims to do cleanup for workers in retry_transaction method.
806   */
807   class Retry_context_sentry {
808   public:
809     /**
810       Constructor to inilizate class objects and flags.
811     */
812     Retry_context_sentry(Slave_worker& parent);
813     /**
814        This destructor calls clean() method which performs the cleanup.
815     */
816     virtual ~Retry_context_sentry();
817     /**
818        Operator to set the value of m_cleaned_up.
819 
820        @param [out] Flag to check for cleanup.
821        @return the value of flag for each worker.
822 
823     */
824     Retry_context_sentry& operator=(bool is_cleaned_up);
825     /**
826        This method performs the cleanup and resets m_order_commit_deadlock flag.
827     */
828     void clean();
829   private:
830     Slave_worker& m_parent;           // Object of enclosed class.
831     bool m_is_cleaned_up;             // Flag to check for cleanup.
832   };
833 };
834 
835 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
836 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item);
837 bool set_max_updated_index_on_stop(Slave_worker *worker,
838                                    Slave_job_item *job_item);
839 
840 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
841 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
842 // Auxiliary function
843 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*, enum_mts_parallel_type);
844 
845 bool append_item_to_jobs(slave_job_item *job_item,
846                          Slave_worker *w, Relay_log_info *rli);
847 Slave_job_item* de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
848 
get_thd_worker(THD * thd)849 inline Slave_worker* get_thd_worker(THD *thd)
850 {
851   return static_cast<Slave_worker *>(thd->rli_slave);
852 }
853 
854 #endif // HAVE_REPLICATION
855 #endif
856