1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "debug_sync.h"
24 #include "rpl_rli_pdb.h"
25 
26 #include "log.h"                            // sql_print_error
27 #include "rpl_slave_commit_order_manager.h" // Commit_order_manager
28 #include "rpl_msr.h"                        // for channel_map
29 
30 #include "pfs_file_provider.h"
31 #include "mysql/psi/mysql_file.h"
32 
33 #ifndef NDEBUG
34   ulong w_rr= 0;
35   uint mts_debug_concurrent_access= 0;
36 #endif
37 
38 #define HASH_DYNAMIC_INIT 4
39 
40 using std::min;
41 using std::max;
42 
43 /**
44    This function is called by both coordinator and workers.
45 
46    Upon receiving the STOP command, the workers will identify a
47    maximum group index already executed (or under execution).
48 
49    All groups whose index are below or equal to the maximum
50    group index will be applied by the workers before stopping.
51 
52    The workers with groups above the maximum group index will
53    exit without applying these groups by setting their running
54    status to "STOP_ACCEPTED".
55 
56    @param worker    a pointer to the waiting Worker struct
57    @param job_item  a pointer to struct carrying a reference to an event
58 
59    @return true if STOP command gets accepted otherwise false is returned.
60 */
handle_slave_worker_stop(Slave_worker * worker,Slave_job_item * job_item)61 bool handle_slave_worker_stop(Slave_worker *worker,
62                               Slave_job_item *job_item)
63 {
64   ulonglong group_index= 0;
65   Relay_log_info *rli= worker->c_rli;
66   mysql_mutex_lock(&rli->exit_count_lock);
67   /*
68     First, W calculates a group-"at-hands" index which is
69     either the currently read ev group index, or the last executed
70     group's one when the  queue is empty.
71   */
72   group_index= (job_item->data)?
73     rli->gaq->get_job_group(job_item->data->mts_group_idx)->total_seqno:
74     worker->last_groups_assigned_index;
75 
76   /*
77     The max updated index is being updated as long as
78     exit_counter permits. That's stopped with the final W's
79     increment of it.
80   */
81   if (!worker->exit_incremented)
82   {
83     if (rli->exit_counter < rli->slave_parallel_workers)
84       rli->max_updated_index = max(rli->max_updated_index, group_index);
85 
86     ++rli->exit_counter;
87     worker->exit_incremented= true;
88     assert(!is_mts_worker(current_thd));
89   }
90 #ifndef NDEBUG
91   else
92     assert(is_mts_worker(current_thd));
93 #endif
94 
95   /*
96     Now let's decide about the deferred exit to consider
97     the empty queue and the counter value reached
98     slave_parallel_workers.
99   */
100   if (!job_item->data)
101   {
102     worker->running_status= Slave_worker::STOP_ACCEPTED;
103     mysql_cond_signal(&worker->jobs_cond);
104     mysql_mutex_unlock(&rli->exit_count_lock);
105     return(true);
106   }
107   else if (rli->exit_counter == rli->slave_parallel_workers)
108   {
109     //over steppers should exit with accepting STOP
110     if (group_index > rli->max_updated_index)
111     {
112       worker->running_status= Slave_worker::STOP_ACCEPTED;
113       mysql_cond_signal(&worker->jobs_cond);
114       mysql_mutex_unlock(&rli->exit_count_lock);
115       return(true);
116     }
117   }
118   mysql_mutex_unlock(&rli->exit_count_lock);
119   return(false);
120 }
121 
122 /**
123    This function is called by both coordinator and workers.
124    Both coordinator and workers contribute to max_updated_index.
125 
126    @param worker    a pointer to the waiting Worker struct
127    @param job_item  a pointer to struct carrying a reference to an event
128 
129    @return true if STOP command gets accepted otherwise false is returned.
130 */
set_max_updated_index_on_stop(Slave_worker * worker,Slave_job_item * job_item)131 bool set_max_updated_index_on_stop(Slave_worker *worker,
132                                    Slave_job_item *job_item)
133 {
134   head_queue(&worker->jobs, job_item);
135   if (worker->running_status == Slave_worker::STOP)
136   {
137     if (handle_slave_worker_stop(worker, job_item))
138       return true;
139   }
140   return false;
141 }
142 
143 /*
144   Please every time you add a new field to the worker slave info, update
145   what follows. For now, this is just used to get the number of fields.
146 */
147 const char *info_slave_worker_fields []=
148 {
149   "id",
150   /*
151     These positions identify what has been executed. Notice that they are
152     redudant and only the group_master_log_name and group_master_log_pos
153     are really necessary. However, the additional information is kept to
154     ease debugging.
155   */
156   "group_relay_log_name",
157   "group_relay_log_pos",
158   "group_master_log_name",
159   "group_master_log_pos",
160 
161   /*
162     These positions identify what a worker knew about the coordinator at
163     the time a job was assigned. Notice that they are redudant and are
164     kept to ease debugging.
165   */
166   "checkpoint_relay_log_name",
167   "checkpoint_relay_log_pos",
168   "checkpoint_master_log_name",
169   "checkpoint_master_log_pos",
170 
171   /*
172     Identify the greatest job, i.e. group, processed by a worker.
173   */
174   "checkpoint_seqno",
175   /*
176     Maximum number of jobs that can be assigned to a worker. This
177     information is necessary to read the next entry.
178   */
179   "checkpoint_group_size",
180   /*
181     Bitmap used to identify what jobs were processed by a worker.
182   */
183   "checkpoint_group_bitmap",
184   /*
185     Channel on which this workers are acting
186   */
187   "channel_name"
188 };
189 
190 /*
191   Number of records in the mts partition hash below which
192   entries with zero usage are tolerated so could be quickly
193   recycled.
194 */
195 const ulong mts_partition_hash_soft_max= 16;
196 
197 /*
198   index value of some outstanding slots of info_slave_worker_fields
199 */
200 enum {
201   LINE_FOR_CHANNEL= 12,
202 };
203 
204 const uint info_slave_worker_table_pk_field_indexes []=
205 {
206   LINE_FOR_CHANNEL,
207   0,
208 };
209 
Slave_worker(Relay_log_info * rli,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel)210 Slave_worker::Slave_worker(Relay_log_info *rli
211 #ifdef HAVE_PSI_INTERFACE
212                            ,PSI_mutex_key *param_key_info_run_lock,
213                            PSI_mutex_key *param_key_info_data_lock,
214                            PSI_mutex_key *param_key_info_sleep_lock,
215                            PSI_mutex_key *param_key_info_thd_lock,
216                            PSI_mutex_key *param_key_info_data_cond,
217                            PSI_mutex_key *param_key_info_start_cond,
218                            PSI_mutex_key *param_key_info_stop_cond,
219                            PSI_mutex_key *param_key_info_sleep_cond
220 #endif
221                            , uint param_id, const char *param_channel
222                           )
223   : Relay_log_info(FALSE
224 #ifdef HAVE_PSI_INTERFACE
225                    ,param_key_info_run_lock, param_key_info_data_lock,
226                    param_key_info_sleep_lock, param_key_info_thd_lock,
227                    param_key_info_data_cond, param_key_info_start_cond,
228                    param_key_info_stop_cond, param_key_info_sleep_cond
229 #endif
230                    , param_id + 1, param_channel, true
231                   ),
232     c_rli(rli),
233     curr_group_exec_parts(key_memory_db_worker_hash_entry),
234     id(param_id),
235     checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
236     checkpoint_seqno(0), running_status(NOT_RUNNING), exit_incremented(false)
237 {
238   /*
239     In the future, it would be great if we use only one identifier.
240     So when factoring out this code, please, consider this.
241   */
242   assert(internal_id == id + 1);
243   checkpoint_relay_log_name[0]= 0;
244   checkpoint_master_log_name[0]= 0;
245 
246   mysql_mutex_init(key_mutex_slave_parallel_worker, &jobs_lock,
247                    MY_MUTEX_INIT_FAST);
248   mysql_cond_init(key_cond_slave_parallel_worker, &jobs_cond);
249   mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
250 }
251 
~Slave_worker()252 Slave_worker::~Slave_worker()
253 {
254   end_info();
255   if (jobs.inited_queue)
256   {
257     assert(jobs.m_Q.size() == jobs.size);
258     jobs.m_Q.clear();
259   }
260   mysql_mutex_destroy(&jobs_lock);
261   mysql_cond_destroy(&jobs_cond);
262   mysql_cond_destroy(&logical_clock_cond);
263   mysql_mutex_lock(&info_thd_lock);
264   info_thd= NULL;
265   mysql_mutex_unlock(&info_thd_lock);
266   set_rli_description_event(NULL);
267 }
268 
269 /**
270    Method is executed by Coordinator at Worker startup time to initialize
271    members parly with values supplied by Coordinator through rli.
272 
273    @param  rli  Coordinator's Relay_log_info pointer
274    @param  i    identifier of the Worker
275 
276    @return 0          success
277            non-zero   failure
278 */
init_worker(Relay_log_info * rli,ulong i)279 int Slave_worker::init_worker(Relay_log_info * rli, ulong i)
280 {
281   DBUG_ENTER("Slave_worker::init_worker");
282   assert(!rli->info_thd->is_error());
283 
284   Slave_job_item empty= Slave_job_item();
285 
286   c_rli= rli;
287   set_commit_order_manager(c_rli->get_commit_order_manager());
288 
289   if (rli_init_info(false) ||
290       DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
291     DBUG_RETURN(1);
292 
293   id= i;
294   curr_group_exec_parts.clear();
295   relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
296   checkpoint_notified= FALSE;       // the same as above
297   master_log_change_notified= false;// W learns master log during 1st group exec
298   fd_change_notified= false; // W is to learn master FD version same as above
299   server_version= version_product(rli->slave_version_split);
300   bitmap_shifted= 0;
301   workers= c_rli->workers; // shallow copying is sufficient
302   wq_empty_waits= wq_size_waits_cnt= groups_done= events_done= curr_jobs= 0;
303   usage_partition= 0;
304   end_group_sets_max_dbs= false;
305   gaq_index= last_group_done_index= c_rli->gaq->size; // out of range
306   last_groups_assigned_index=0;
307   assert(!jobs.inited_queue);
308   jobs.avail= 0;
309   jobs.len= 0;
310   jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
311   jobs.waited_overfill= 0;
312   jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max;
313   jobs.inited_queue= true;
314   curr_group_seen_begin= curr_group_seen_gtid= false;
315 #ifndef NDEBUG
316   curr_group_seen_sequence_number= false;
317 #endif
318   jobs.m_Q.resize(jobs.size, empty);
319   assert(jobs.m_Q.size() == jobs.size);
320 
321   wq_overrun_cnt= excess_cnt= 0;
322   underrun_level= (ulong) ((rli->mts_worker_underrun_level * jobs.size) / 100.0);
323   // overrun level is symmetric to underrun (as underrun to the full queue)
324   overrun_level= jobs.size - underrun_level;
325 
326   /* create mts submode for each of the the workers. */
327   current_mts_submode=
328     (rli->channel_mts_submode == MTS_PARALLEL_TYPE_DB_NAME)?
329        (Mts_submode*) new Mts_submode_database():
330        (Mts_submode*) new Mts_submode_logical_clock();
331 
332   //workers and coordinator must be of the same type
333   assert(rli->current_mts_submode->get_type() ==
334          current_mts_submode->get_type());
335 
336   m_order_commit_deadlock= false;
337   DBUG_RETURN(0);
338 }
339 
340 /**
341    A part of Slave worker iitializer that provides a
342    minimum context for MTS recovery.
343 
344    @param is_gaps_collecting_phase
345 
346           clarifies what state the caller
347           executes this method from. When it's @c true
348           that is @c mts_recovery_groups() and Worker should
349           restore the last session time info which is processed
350           to collect gaps that is not executed transactions (groups).
351           Such recovery Slave_worker intance is destroyed at the end of
352           @c mts_recovery_groups().
353           Whet it's @c false Slave_worker is initialized for the run time
354           nad should not read the last session time stale info.
355           Its info will be ultimately reset once all gaps are executed
356           to finish off recovery.
357 
358    @return 0 on success, non-zero for a failure
359 */
rli_init_info(bool is_gaps_collecting_phase)360 int Slave_worker::rli_init_info(bool is_gaps_collecting_phase)
361 {
362   enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
363 
364   DBUG_ENTER("Slave_worker::rli_init_info");
365 
366   if (inited)
367     DBUG_RETURN(0);
368 
369   /*
370     Worker bitmap size depends on recovery mode.
371     If it is gaps collecting the bitmaps must be capable to accept
372     up to MTS_MAX_BITS_IN_GROUP of bits.
373   */
374   size_t num_bits= is_gaps_collecting_phase ?
375     MTS_MAX_BITS_IN_GROUP : c_rli->checkpoint_group;
376   /*
377     This checks if the repository was created before and thus there
378     will be values to be read. Please, do not move this call after
379     the handler->init_info().
380   */
381   return_check= check_info();
382   if (return_check == ERROR_CHECKING_REPOSITORY ||
383       (return_check == REPOSITORY_DOES_NOT_EXIST && is_gaps_collecting_phase))
384     goto err;
385 
386   if (handler->init_info())
387     goto err;
388 
389   bitmap_init(&group_executed, NULL, num_bits, FALSE);
390   bitmap_init(&group_shifted, NULL, num_bits, FALSE);
391 
392   if (is_gaps_collecting_phase &&
393       (DBUG_EVALUATE_IF("mts_slave_worker_init_at_gaps_fails", true, false) ||
394        read_info(handler)))
395   {
396     bitmap_free(&group_executed);
397     bitmap_free(&group_shifted);
398     goto err;
399   }
400   inited= 1;
401 
402   DBUG_RETURN(0);
403 
404 err:
405   // todo: handler->end_info(uidx, nidx);
406   inited= 0;
407   sql_print_error("Error reading slave worker configuration");
408   DBUG_RETURN(1);
409 }
410 
end_info()411 void Slave_worker::end_info()
412 {
413   DBUG_ENTER("Slave_worker::end_info");
414 
415   if (!inited)
416     DBUG_VOID_RETURN;
417 
418   if (handler)
419     handler->end_info();
420 
421   if (inited)
422   {
423     bitmap_free(&group_executed);
424     bitmap_free(&group_shifted);
425   }
426   inited = 0;
427 
428   DBUG_VOID_RETURN;
429 }
430 
flush_info(const bool force)431 int Slave_worker::flush_info(const bool force)
432 {
433   DBUG_ENTER("Slave_worker::flush_info");
434 
435   if (!inited)
436     DBUG_RETURN(0);
437 
438   /*
439     We update the sync_period at this point because only here we
440     now that we are handling a Slave_worker. This needs to be
441     update every time we call flush because the option may be
442     dinamically set.
443   */
444   handler->set_sync_period(sync_relayloginfo_period);
445 
446   if (write_info(handler))
447     goto err;
448 
449   if (handler->flush_info(force))
450     goto err;
451 
452   DBUG_RETURN(0);
453 
454 err:
455   sql_print_error("Error writing slave worker configuration");
456   DBUG_RETURN(1);
457 }
458 
read_info(Rpl_info_handler * from)459 bool Slave_worker::read_info(Rpl_info_handler *from)
460 {
461   DBUG_ENTER("Slave_worker::read_info");
462 
463   ulong temp_group_relay_log_pos= 0;
464   char temp_group_master_log_name[FN_REFLEN];
465   ulong temp_group_master_log_pos= 0;
466   ulong temp_checkpoint_relay_log_pos= 0;
467   ulong temp_checkpoint_master_log_pos= 0;
468   ulong temp_checkpoint_seqno= 0;
469   ulong nbytes= 0;
470   uchar *buffer= (uchar *) group_executed.bitmap;
471   int temp_internal_id= 0;
472 
473   if (from->prepare_info_for_read())
474     DBUG_RETURN(TRUE);
475 
476   if (from->get_info(&temp_internal_id, 0) ||
477       from->get_info(group_relay_log_name,
478                      sizeof(group_relay_log_name),
479                      (char *) "") ||
480       from->get_info(&temp_group_relay_log_pos,
481                      0UL) ||
482       from->get_info(temp_group_master_log_name,
483                      sizeof(temp_group_master_log_name),
484                      (char *) "") ||
485       from->get_info(&temp_group_master_log_pos,
486                      0UL) ||
487       from->get_info(checkpoint_relay_log_name,
488                      sizeof(checkpoint_relay_log_name),
489                      (char *) "") ||
490       from->get_info(&temp_checkpoint_relay_log_pos,
491                      0UL) ||
492       from->get_info(checkpoint_master_log_name,
493                      sizeof(checkpoint_master_log_name),
494                      (char *) "") ||
495       from->get_info(&temp_checkpoint_master_log_pos,
496                      0UL) ||
497       from->get_info(&temp_checkpoint_seqno,
498                      0UL) ||
499       from->get_info(&nbytes, 0UL) ||
500       from->get_info(buffer, (size_t) nbytes,
501                      (uchar *) 0) ||
502       /* default is empty string */
503       from->get_info(channel, sizeof(channel),(char*)""))
504     DBUG_RETURN(TRUE);
505 
506   assert(nbytes <= no_bytes_in_map(&group_executed));
507 
508   internal_id=(uint) temp_internal_id;
509   group_relay_log_pos=  temp_group_relay_log_pos;
510   set_group_master_log_name(temp_group_master_log_name);
511   set_group_master_log_pos(temp_group_master_log_pos);
512   checkpoint_relay_log_pos=  temp_checkpoint_relay_log_pos;
513   checkpoint_master_log_pos= temp_checkpoint_master_log_pos;
514   checkpoint_seqno= temp_checkpoint_seqno;
515 
516   DBUG_RETURN(FALSE);
517 }
518 
519 /*
520   This function is used to make a copy of the worker object before we
521   destroy it while STOP SLAVE. This new object is then used to report the
522   worker status until next START SLAVE following which the new worker objetcs
523   will be used.
524 */
copy_values_for_PFS(ulong worker_id,en_running_state thd_running_status,THD * worker_thd,const Error & last_error,const Gtid_specification & gtid)525 void Slave_worker::copy_values_for_PFS(ulong worker_id,
526                                        en_running_state thd_running_status,
527                                        THD *worker_thd,
528                                        const Error &last_error,
529                                        const Gtid_specification &gtid)
530 {
531   id= worker_id;
532   running_status= thd_running_status;
533   info_thd= worker_thd;
534   m_last_error= last_error;
535   currently_executing_gtid= gtid;
536 }
537 
set_info_search_keys(Rpl_info_handler * to)538 bool Slave_worker::set_info_search_keys(Rpl_info_handler *to)
539 {
540   DBUG_ENTER("Slave_worker::set_info_search_keys");
541 
542   /* primary keys are Id and channel_name */
543   if(to->set_info(0, (int)internal_id ) || to->set_info(LINE_FOR_CHANNEL, channel))
544     DBUG_RETURN(TRUE);
545 
546   DBUG_RETURN(FALSE);
547 }
548 
write_info(Rpl_info_handler * to)549 bool Slave_worker::write_info(Rpl_info_handler *to)
550 {
551   DBUG_ENTER("Master_info::write_info");
552 
553   ulong nbytes= (ulong) no_bytes_in_map(&group_executed);
554   uchar *buffer= (uchar*) group_executed.bitmap;
555   assert(nbytes <= (c_rli->checkpoint_group + 7) / 8);
556 
557   if (to->prepare_info_for_write() ||
558       to->set_info((int) internal_id) ||
559       to->set_info(group_relay_log_name) ||
560       to->set_info((ulong) group_relay_log_pos) ||
561       to->set_info(get_group_master_log_name()) ||
562       to->set_info((ulong) get_group_master_log_pos()) ||
563       to->set_info(checkpoint_relay_log_name) ||
564       to->set_info((ulong) checkpoint_relay_log_pos) ||
565       to->set_info(checkpoint_master_log_name) ||
566       to->set_info((ulong) checkpoint_master_log_pos) ||
567       to->set_info((ulong) checkpoint_seqno) ||
568       to->set_info(nbytes) ||
569       to->set_info(buffer, (size_t) nbytes)||
570       to->set_info(channel))
571     DBUG_RETURN(TRUE);
572 
573   DBUG_RETURN(FALSE);
574 }
575 
576 /**
577    Clean up a part of Worker info table that is regarded in
578    in gaps collecting at recovery.
579    This worker won't contribute to recovery bitmap at future
580    slave restart (see @c mts_recovery_groups).
581 
582    @retrun FALSE as success TRUE as failure
583 */
reset_recovery_info()584 bool Slave_worker::reset_recovery_info()
585 {
586   bool binlog_prot_acquired= false;
587 
588   DBUG_ENTER("Slave_worker::reset_recovery_info");
589 
590   if (info_thd && !info_thd->backup_binlog_lock.is_acquired())
591   {
592     const ulong timeout= info_thd->variables.lock_wait_timeout;
593 
594     DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
595 
596     if (info_thd->backup_binlog_lock.acquire_protection(info_thd, MDL_EXPLICIT,
597                                                         timeout))
598       DBUG_RETURN(true);
599 
600     binlog_prot_acquired= true;
601   }
602 
603   set_group_master_log_name("");
604   set_group_master_log_pos(0);
605 
606   if (binlog_prot_acquired)
607   {
608     DBUG_PRINT("debug", ("Releasing binlog protection lock"));
609     info_thd->backup_binlog_lock.release_protection(info_thd);
610   }
611 
612 
613   DBUG_RETURN(flush_info(true));
614 }
615 
get_number_worker_fields()616 size_t Slave_worker::get_number_worker_fields()
617 {
618   return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
619 }
620 
get_master_log_name()621 const char* Slave_worker::get_master_log_name()
622 {
623   Slave_job_group* ptr_g= c_rli->gaq->get_job_group(gaq_index);
624 
625   return (ptr_g->checkpoint_log_name != NULL) ?
626     ptr_g->checkpoint_log_name : checkpoint_master_log_name;
627 }
628 
commit_positions(Log_event * ev,Slave_job_group * ptr_g,bool force)629 bool Slave_worker::commit_positions(Log_event *ev, Slave_job_group* ptr_g, bool force)
630 {
631   bool binlog_prot_acquired= false;
632   DBUG_ENTER("Slave_worker::checkpoint_positions");
633 
634   if (info_thd && !info_thd->backup_binlog_lock.is_acquired())
635   {
636     const ulong timeout= info_thd->variables.lock_wait_timeout;
637 
638     DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
639 
640     if (info_thd->backup_binlog_lock.acquire_protection(info_thd, MDL_EXPLICIT,
641                                                         timeout))
642       DBUG_RETURN(true);
643 
644     binlog_prot_acquired= true;
645   }
646 
647   /*
648     Initial value of checkpoint_master_log_name is learned from
649     group_master_log_name. The latter can be passed to Worker
650     at rare event of master binlog rotation.
651     This initialization is needed to provide to Worker info
652     on physical coordiates during execution of the very first group
653     after a rotation.
654   */
655   if (ptr_g->group_master_log_name != NULL)
656   {
657     set_group_master_log_name(ptr_g->group_master_log_name);
658     my_free(ptr_g->group_master_log_name);
659     ptr_g->group_master_log_name= NULL;
660     strmake(checkpoint_master_log_name, get_group_master_log_name(),
661             sizeof(checkpoint_master_log_name) - 1);
662   }
663   if (ptr_g->checkpoint_log_name != NULL)
664   {
665     strmake(checkpoint_relay_log_name, ptr_g->checkpoint_relay_log_name,
666             sizeof(checkpoint_relay_log_name) - 1);
667     checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos;
668     strmake(checkpoint_master_log_name, ptr_g->checkpoint_log_name,
669             sizeof(checkpoint_master_log_name) - 1);
670     checkpoint_master_log_pos= ptr_g->checkpoint_log_pos;
671 
672     my_free(ptr_g->checkpoint_log_name);
673     ptr_g->checkpoint_log_name= NULL;
674     my_free(ptr_g->checkpoint_relay_log_name);
675     ptr_g->checkpoint_relay_log_name= NULL;
676 
677     bitmap_copy(&group_shifted, &group_executed);
678     bitmap_clear_all(&group_executed);
679     for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++)
680     {
681       if (bitmap_is_set(&group_shifted, pos))
682         bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
683     }
684   }
685   /*
686     Extracts an updated relay-log name to store in Worker's rli.
687   */
688   if (ptr_g->group_relay_log_name)
689   {
690     assert(strlen(ptr_g->group_relay_log_name) + 1
691            <= sizeof(group_relay_log_name));
692     strmake(group_relay_log_name, ptr_g->group_relay_log_name,
693             sizeof(group_relay_log_name) - 1);
694   }
695 
696   assert(ptr_g->checkpoint_seqno <= (c_rli->checkpoint_group - 1));
697 
698   bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
699   checkpoint_seqno= ptr_g->checkpoint_seqno;
700   group_relay_log_pos= ev->future_event_relay_log_pos;
701   set_group_master_log_pos(ev->common_header->log_pos);
702 
703   /*
704     Directly accessing c_rli->get_group_master_log_name() does not
705     represent a concurrency issue because the current code places
706     a synchronization point when master rotates.
707   */
708   set_group_master_log_name(c_rli->get_group_master_log_name());
709 
710   DBUG_PRINT("mts", ("Committing worker-id %lu group master log pos %llu "
711              "group master log name %s checkpoint sequence number %lu.",
712                      id, get_group_master_log_pos(),
713                      get_group_master_log_name(), checkpoint_seqno));
714 
715   DBUG_EXECUTE_IF("mts_debug_concurrent_access",
716     {
717       mts_debug_concurrent_access++;
718     };
719   );
720 
721   if (binlog_prot_acquired)
722   {
723     DBUG_PRINT("debug", ("Releasing binlog protection lock"));
724     info_thd->backup_binlog_lock.release_protection(info_thd);
725   }
726 
727   DBUG_RETURN(flush_info(force));
728 }
729 
rollback_positions(Slave_job_group * ptr_g)730 void Slave_worker::rollback_positions(Slave_job_group* ptr_g)
731 {
732   if (!is_transactional())
733   {
734     bitmap_clear_bit(&group_executed, ptr_g->checkpoint_seqno);
735     flush_info(false);
736   }
737 }
738 
get_key(const uchar * record,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))739 extern "C" uchar *get_key(const uchar *record, size_t *length,
740                           my_bool not_used MY_ATTRIBUTE((unused)))
741 {
742   DBUG_ENTER("get_key");
743 
744   db_worker_hash_entry *entry=(db_worker_hash_entry *) record;
745   *length= strlen(entry->db);
746 
747   DBUG_PRINT("info", ("get_key  %s, %d", entry->db, (int) *length));
748 
749   DBUG_RETURN((uchar*) entry->db);
750 }
751 
752 
free_entry(db_worker_hash_entry * entry)753 static void free_entry(db_worker_hash_entry *entry)
754 {
755   THD *c_thd= current_thd;
756 
757   DBUG_ENTER("free_entry");
758 
759   DBUG_PRINT("info", ("free_entry %s, %zu", entry->db, strlen(entry->db)));
760 
761   assert(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
762 
763   /*
764     Although assert is correct valgrind senses entry->worker can be freed.
765 
766     assert(entry->usage == 0 ||
767     !entry->worker    ||  // last entry owner could have errored out
768     entry->worker->running_status != Slave_worker::RUNNING);
769   */
770 
771   mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
772   entry->temporary_tables= NULL;
773 
774   my_free((void *) entry->db);
775   my_free(entry);
776 
777   DBUG_VOID_RETURN;
778 }
779 
init_hash_workers(Relay_log_info * rli)780 bool init_hash_workers(Relay_log_info *rli)
781 {
782   DBUG_ENTER("init_hash_workers");
783 
784   rli->inited_hash_workers=
785     (my_hash_init(&rli->mapping_db_to_worker, &my_charset_bin,
786                  0, 0, 0, get_key,
787                  (my_hash_free_key) free_entry, 0,
788                  key_memory_db_worker_hash_entry) == 0);
789   if (rli->inited_hash_workers)
790   {
791     mysql_mutex_init(key_mutex_slave_worker_hash, &rli->slave_worker_hash_lock,
792                      MY_MUTEX_INIT_FAST);
793     mysql_cond_init(key_cond_slave_worker_hash, &rli->slave_worker_hash_cond);
794   }
795 
796   DBUG_RETURN (!rli->inited_hash_workers);
797 }
798 
destroy_hash_workers(Relay_log_info * rli)799 void destroy_hash_workers(Relay_log_info *rli)
800 {
801   DBUG_ENTER("destroy_hash_workers");
802   if (rli->inited_hash_workers)
803   {
804     my_hash_free(&rli->mapping_db_to_worker);
805     mysql_mutex_destroy(&rli->slave_worker_hash_lock);
806     mysql_cond_destroy(&rli->slave_worker_hash_cond);
807     rli->inited_hash_workers= false;
808   }
809 
810   DBUG_VOID_RETURN;
811 }
812 
813 /**
814    Relocating temporary table reference into @c entry's table list head.
815    Sources can be the coordinator's and the Worker's thd->temporary_tables.
816 
817    @param table   TABLE instance pointer
818    @param thd     THD instance pointer of the source of relocation
819    @param entry   db_worker_hash_entry instance pointer
820 
821    @note  thd->temporary_tables can become NULL
822 
823    @return the pointer to a table following the unlinked
824 */
mts_move_temp_table_to_entry(TABLE * table,THD * thd,db_worker_hash_entry * entry)825 TABLE* mts_move_temp_table_to_entry(TABLE *table, THD *thd,
826                                     db_worker_hash_entry *entry)
827 {
828   TABLE *ret= table->next;
829 
830   if (table->prev)
831   {
832     table->prev->next= table->next;
833     if (table->prev->next)
834       table->next->prev= table->prev;
835   }
836   else
837   {
838     /* removing the first item from the list */
839     assert(table == thd->temporary_tables);
840 
841     thd->temporary_tables= table->next;
842     if (thd->temporary_tables)
843       table->next->prev= 0;
844   }
845   table->next= entry->temporary_tables;
846   table->prev= 0;
847   if (table->next)
848     table->next->prev= table;
849   entry->temporary_tables= table;
850 
851   return ret;
852 }
853 
854 
855 /**
856    Relocation of the list of temporary tables to thd->temporary_tables.
857 
858    @param thd     THD instance pointer of the destination
859    @param temporary_tables
860                   the source temporary_tables list
861 
862    @note     destroying references to the source list, if necessary,
863              is left to the caller.
864 
865    @return   the post-merge value of thd->temporary_tables.
866 */
mts_move_temp_tables_to_thd(THD * thd,TABLE * temporary_tables)867 TABLE* mts_move_temp_tables_to_thd(THD *thd, TABLE *temporary_tables)
868 {
869   DBUG_ENTER ("mts_move_temp_tables_to_thd");
870   TABLE *table= temporary_tables;
871   if (!table)
872     DBUG_RETURN(NULL);
873 
874   // accept only if this is the start of the list.
875   assert(!table->prev);
876 
877   // walk along the source list and associate the tables with thd
878   do
879   {
880     table->in_use= thd;
881   } while(table->next && (table= table->next));
882 
883   // link the former list against the tail of the source list
884   if (thd->temporary_tables)
885     thd->temporary_tables->prev= table;
886   table->next= thd->temporary_tables;
887   thd->temporary_tables= temporary_tables;
888   DBUG_RETURN(thd->temporary_tables);
889 }
890 
891 /**
892    Relocating references of temporary tables of a database
893    of the entry argument from THD into the entry.
894 
895    @param thd    THD pointer of the source temporary_tables list
896    @param entry  a pointer to db_worker_hash_entry record
897                  containing database descriptor and temporary_tables list.
898 
899 */
move_temp_tables_to_entry(THD * thd,db_worker_hash_entry * entry)900 static void move_temp_tables_to_entry(THD* thd, db_worker_hash_entry* entry)
901 {
902   for (TABLE *table= thd->temporary_tables; table;)
903   {
904     if (strcmp(table->s->db.str, entry->db) == 0)
905     {
906       // table pointer is shifted inside the function
907       table= mts_move_temp_table_to_entry(table, thd, entry);
908     }
909     else
910     {
911       table= table->next;
912     }
913   }
914 }
915 
916 
917 /**
918    The function produces a reference to the struct of a Worker
919    that has been or will be engaged to process the @c dbname -keyed  partition (D).
920    It checks a local to Coordinator CGAP list first and returns
921    @c last_assigned_worker when found (todo: assert).
922 
923    Otherwise, the partition is appended to the current group list:
924 
925         CGAP .= D
926 
927    here .= is concatenate operation,
928    and a possible D's Worker id is searched in Assigned Partition Hash
929    (APH) that collects tuples (P, W_id, U, mutex, cond).
930    In case not found,
931 
932         W_d := W_c unless W_c is NULL.
933 
934    When W_c is NULL it is assigned to a least occupied as defined by
935    @c get_least_occupied_worker().
936 
937         W_d := W_c := W_{least_occupied}
938 
939         APH .=  a new (D, W_d, 1)
940 
941    In a case APH contains W_d == W_c, (assert U >= 1)
942 
943         update APH set  U++ where  APH.P = D
944 
945    The case APH contains a W_d != W_c != NULL assigned to D-partition represents
946    the hashing conflict and is handled as the following:
947 
948      a. marks the record of APH with a flag requesting to signal in the
949         cond var when `U' the usage counter drops to zero by the other Worker;
950      b. waits for the other Worker to finish tasks on that partition and
951         gets the signal;
952      c. updates the APH record to point to the first Worker (naturally, U := 1),
953         scheduled the event, and goes back into the parallel mode
954 
955    @param  dbname      pointer to c-string containing database name
956                        It can be empty string to indicate specific locking
957                        to faciliate sequential applying.
958    @param  rli         pointer to Coordinators relay-log-info instance
959    @param  ptr_entry   reference to a pointer to the resulted entry in
960                        the Assigne Partition Hash where
961                        the entry's pointer is stored at return.
962    @param  need_temp_tables
963                        if FALSE migration of temporary tables not needed
964    @param  last_worker caller opts for this Worker, it must be
965                        rli->last_assigned_worker if one is determined.
966 
967    @note modifies  CGAP, APH and unlinks @c dbname -keyd temporary tables
968          from C's thd->temporary_tables to move them into the entry record.
969 
970    @return the pointer to a Worker struct
971 */
map_db_to_worker(const char * dbname,Relay_log_info * rli,db_worker_hash_entry ** ptr_entry,bool need_temp_tables,Slave_worker * last_worker)972 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
973                                db_worker_hash_entry **ptr_entry,
974                                bool need_temp_tables, Slave_worker *last_worker)
975 {
976   Slave_worker_array *workers= &rli->workers;
977 
978   THD *thd= rli->info_thd;
979 
980   DBUG_ENTER("map_db_to_worker");
981 
982   assert(!rli->last_assigned_worker ||
983          rli->last_assigned_worker == last_worker);
984   assert(is_mts_db_partitioned(rli));
985 
986   if (!rli->inited_hash_workers)
987     DBUG_RETURN(NULL);
988 
989   db_worker_hash_entry *entry= NULL;
990   my_hash_value_type hash_value;
991   size_t dblength= strlen(dbname);
992 
993 
994   // Search in CGAP
995   for (db_worker_hash_entry **it= rli->curr_group_assigned_parts.begin();
996        it != rli->curr_group_assigned_parts.end(); ++it)
997   {
998     entry= *it;
999     if ((uchar) entry->db_len != dblength)
1000       continue;
1001     else
1002       if (strncmp(entry->db, const_cast<char*>(dbname), dblength) == 0)
1003       {
1004         *ptr_entry= entry;
1005         DBUG_RETURN(last_worker);
1006       }
1007   }
1008 
1009   DBUG_PRINT("info", ("Searching for %s, %zu", dbname, dblength));
1010 
1011   hash_value= my_calc_hash(&rli->mapping_db_to_worker, (uchar*) dbname,
1012                            dblength);
1013 
1014   mysql_mutex_lock(&rli->slave_worker_hash_lock);
1015 
1016   entry= (db_worker_hash_entry *)
1017     my_hash_search_using_hash_value(&rli->mapping_db_to_worker, hash_value,
1018                                     (uchar*) dbname, dblength);
1019   if (!entry)
1020   {
1021     /*
1022       The database name was not found which means that a worker never
1023       processed events from that database. In such case, we need to
1024       map the database to a worker my inserting an entry into the
1025       hash map.
1026     */
1027     my_bool ret;
1028     char *db= NULL;
1029 
1030     mysql_mutex_unlock(&rli->slave_worker_hash_lock);
1031 
1032     DBUG_PRINT("info", ("Inserting %s, %zu", dbname, dblength));
1033     /*
1034       Allocate an entry to be inserted and if the operation fails
1035       an error is returned.
1036     */
1037     if (!(db= (char *) my_malloc(key_memory_db_worker_hash_entry,
1038                                  dblength + 1, MYF(0))))
1039       goto err;
1040     if (!(entry= (db_worker_hash_entry *)
1041           my_malloc(key_memory_db_worker_hash_entry,
1042                     sizeof(db_worker_hash_entry), MYF(0))))
1043     {
1044       my_free(db);
1045       goto err;
1046     }
1047     my_stpcpy(db, dbname);
1048     entry->db= db;
1049     entry->db_len= strlen(db);
1050     entry->usage= 1;
1051     entry->temporary_tables= NULL;
1052     /*
1053       Unless \exists the last assigned Worker, get a free worker based
1054       on a policy described in the function get_least_occupied_worker().
1055     */
1056     mysql_mutex_lock(&rli->slave_worker_hash_lock);
1057 
1058     entry->worker= (!last_worker) ?
1059       get_least_occupied_worker(rli, workers, NULL) : last_worker;
1060     entry->worker->usage_partition++;
1061     if (rli->mapping_db_to_worker.records > mts_partition_hash_soft_max)
1062     {
1063       /*
1064         A dynamic array to store the mapping_db_to_worker hash elements
1065         that needs to be deleted, since deleting the hash entires while
1066         iterating over it is wrong.
1067       */
1068       Prealloced_array<db_worker_hash_entry*, HASH_DYNAMIC_INIT>
1069         hash_element(key_memory_db_worker_hash_entry);
1070       /*
1071         remove zero-usage (todo: rare or long ago scheduled) records.
1072         Store the element of the hash in a dynamic array after checking whether
1073         the usage of the hash entry is 0 or not. We later free it from the HASH.
1074       */
1075       for (uint i= 0; i < rli->mapping_db_to_worker.records; i++)
1076       {
1077         assert(!entry->temporary_tables || !entry->temporary_tables->prev);
1078         assert(!thd->temporary_tables || !thd->temporary_tables->prev);
1079 
1080         db_worker_hash_entry *entry=
1081           (db_worker_hash_entry*) my_hash_element(&rli->mapping_db_to_worker, i);
1082 
1083         if (entry->usage == 0)
1084         {
1085           mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
1086           entry->temporary_tables= NULL;
1087 
1088           /* Push the element in the dynamic array*/
1089           hash_element.push_back(entry);
1090         }
1091       }
1092 
1093       /* Delete the hash element based on the usage */
1094       for (size_t i=0 ; i < hash_element.size(); i++)
1095       {
1096         db_worker_hash_entry *temp_entry= hash_element[i];
1097         my_hash_delete(&rli->mapping_db_to_worker, (uchar*) temp_entry);
1098       }
1099     }
1100 
1101     ret= my_hash_insert(&rli->mapping_db_to_worker, (uchar*) entry);
1102 
1103     if (ret)
1104     {
1105       my_free(db);
1106       my_free(entry);
1107       entry= NULL;
1108       goto err;
1109     }
1110     DBUG_PRINT("info", ("Inserted %s, %zu", entry->db, strlen(entry->db)));
1111   }
1112   else
1113   {
1114     /* There is a record. Either  */
1115     if (entry->usage == 0)
1116     {
1117       entry->worker= (!last_worker) ?
1118         get_least_occupied_worker(rli, workers, NULL) : last_worker;
1119       entry->worker->usage_partition++;
1120       entry->usage++;
1121     }
1122     else if (entry->worker == last_worker || !last_worker)
1123     {
1124 
1125       assert(entry->worker);
1126 
1127       entry->usage++;
1128     }
1129     else
1130     {
1131       // The case APH contains a W_d != W_c != NULL assigned to
1132       // D-partition represents
1133       // the hashing conflict and is handled as the following:
1134       PSI_stage_info old_stage;
1135 
1136       assert(last_worker != NULL &&
1137              rli->curr_group_assigned_parts.size() > 0);
1138 
1139       // future assignenment and marking at the same time
1140       entry->worker= last_worker;
1141       // loop while a user thread is stopping Coordinator gracefully
1142       do
1143       {
1144         thd->ENTER_COND(&rli->slave_worker_hash_cond,
1145                                    &rli->slave_worker_hash_lock,
1146                                    &stage_slave_waiting_worker_to_release_partition,
1147                                    &old_stage);
1148         mysql_cond_wait(&rli->slave_worker_hash_cond, &rli->slave_worker_hash_lock);
1149       } while (entry->usage != 0 && !thd->killed);
1150 
1151       mysql_mutex_unlock(&rli->slave_worker_hash_lock);
1152       thd->EXIT_COND(&old_stage);
1153       if (thd->killed)
1154       {
1155         entry= NULL;
1156         goto err;
1157       }
1158       mysql_mutex_lock(&rli->slave_worker_hash_lock);
1159       entry->usage= 1;
1160       entry->worker->usage_partition++;
1161     }
1162   }
1163 
1164   /*
1165      relocation belonging to db temporary tables from C to W via entry
1166   */
1167   if (entry->usage == 1 && need_temp_tables)
1168   {
1169     if (!entry->temporary_tables)
1170     {
1171       if (entry->db_len != 0)
1172       {
1173         move_temp_tables_to_entry(thd, entry);
1174       }
1175       else
1176       {
1177         entry->temporary_tables= thd->temporary_tables;
1178         thd->temporary_tables= NULL;
1179       }
1180     }
1181 #ifndef NDEBUG
1182     else
1183     {
1184       // all entries must have been emptied from temps by the caller
1185 
1186       for (TABLE *table= thd->temporary_tables; table; table= table->next)
1187       {
1188         assert(0 != strcmp(table->s->db.str, entry->db));
1189       }
1190     }
1191 #endif
1192   }
1193   mysql_mutex_unlock(&rli->slave_worker_hash_lock);
1194 
1195   assert(entry);
1196 
1197 err:
1198   if (entry)
1199   {
1200     DBUG_PRINT("info",
1201                ("Updating %s with worker %lu", entry->db, entry->worker->id));
1202     rli->curr_group_assigned_parts.push_back(entry);
1203     *ptr_entry= entry;
1204   }
1205   DBUG_RETURN(entry ? entry->worker : NULL);
1206 }
1207 
1208 /**
1209    Get the least occupied worker.
1210 
1211    @param ws  dynarray of pointers to Slave_worker
1212    @return a pointer to chosen Slave_worker instance
1213 
1214 */
get_least_occupied_worker(Relay_log_info * rli,Slave_worker_array * ws,Log_event * ev)1215 Slave_worker *get_least_occupied_worker(Relay_log_info *rli,
1216                                         Slave_worker_array *ws,
1217                                         Log_event* ev)
1218 {
1219   return rli->current_mts_submode->get_least_occupied_worker(rli, ws, ev);
1220 }
1221 
1222 /**
1223    Deallocation routine to cancel out few effects of
1224    @c map_db_to_worker().
1225    Involved into processing of the group APH tuples are updated.
1226    @c last_group_done_index member is set to the GAQ index of
1227    the current group.
1228    CGEP the Worker partition cache is cleaned up.
1229 
1230    @param ev     a pointer to Log_event
1231    @param error  error code after processing the event by caller.
1232 */
slave_worker_ends_group(Log_event * ev,int error)1233 void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
1234 {
1235   DBUG_ENTER("Slave_worker::slave_worker_ends_group");
1236   Slave_job_group *ptr_g= NULL;
1237 
1238   if (!error)
1239   {
1240     ptr_g= c_rli->gaq->get_job_group(gaq_index);
1241 
1242     assert(gaq_index == ev->mts_group_idx);
1243     /*
1244       It guarantees that the worker is removed from order commit queue when
1245       its transaction doesn't binlog anything. It will break innodb group commit,
1246       but it should rarely happen.
1247     */
1248     if (get_commit_order_manager())
1249       get_commit_order_manager()->report_commit(this);
1250 
1251     // first ever group must have relay log name
1252     assert(last_group_done_index != c_rli->gaq->size ||
1253            ptr_g->group_relay_log_name != NULL);
1254     assert(ptr_g->worker_id == id);
1255 
1256     if (ev->get_type_code() != binary_log::XID_EVENT)
1257     {
1258       commit_positions(ev, ptr_g, false);
1259       DBUG_EXECUTE_IF("crash_after_commit_and_update_pos",
1260            sql_print_information("Crashing crash_after_commit_and_update_pos.");
1261            flush_info(TRUE);
1262            DBUG_SUICIDE();
1263       );
1264     }
1265 
1266     ptr_g->group_master_log_pos= get_group_master_log_pos();
1267     ptr_g->group_relay_log_pos= group_relay_log_pos;
1268     my_atomic_store32(&ptr_g->done, 1);
1269     last_group_done_index= gaq_index;
1270     last_groups_assigned_index= ptr_g->total_seqno;
1271     reset_gaq_index();
1272     groups_done++;
1273 
1274   }
1275   else
1276   {
1277     if (running_status != STOP_ACCEPTED)
1278     {
1279       // tagging as exiting so Coordinator won't be able synchronize with it
1280       mysql_mutex_lock(&jobs_lock);
1281       running_status= ERROR_LEAVING;
1282       mysql_mutex_unlock(&jobs_lock);
1283 
1284       /* Fatal error happens, it notifies the following transaction to rollback */
1285       if (get_commit_order_manager())
1286         get_commit_order_manager()->report_rollback(this);
1287 
1288       // Killing Coordinator to indicate eventual consistency error
1289       mysql_mutex_lock(&c_rli->info_thd->LOCK_thd_data);
1290       c_rli->info_thd->awake(THD::KILL_QUERY);
1291       mysql_mutex_unlock(&c_rli->info_thd->LOCK_thd_data);
1292     }
1293   }
1294 
1295   /*
1296     Cleanup relating to the last executed group regardless of error.
1297   */
1298   if (current_mts_submode->get_type() == MTS_PARALLEL_TYPE_DB_NAME)
1299   {
1300   for (size_t i= 0; i < curr_group_exec_parts.size(); i++)
1301   {
1302     db_worker_hash_entry *entry= curr_group_exec_parts[i];
1303 
1304     mysql_mutex_lock(&c_rli->slave_worker_hash_lock);
1305 
1306     assert(entry);
1307 
1308     entry->usage --;
1309 
1310     assert(entry->usage >= 0);
1311 
1312     if (entry->usage == 0)
1313     {
1314       usage_partition--;
1315       /*
1316         The detached entry's temp table list, possibly updated, remains
1317         with the entry at least until time Coordinator will deallocate it
1318         from the hash, that is either due to stop or extra size of the hash.
1319       */
1320       assert(usage_partition >= 0);
1321       assert(this->info_thd->temporary_tables == 0);
1322       assert(!entry->temporary_tables ||
1323              !entry->temporary_tables->prev);
1324 
1325       if (entry->worker != this) // Coordinator is waiting
1326       {
1327 #ifndef NDEBUG
1328         // TODO: open it! assert(usage_partition || !entry->worker->jobs.len);
1329 #endif
1330         DBUG_PRINT("info",
1331                    ("Notifying entry %p release by worker %lu", entry, this->id));
1332 
1333         mysql_cond_signal(&c_rli->slave_worker_hash_cond);
1334       }
1335     }
1336     else
1337       assert(usage_partition != 0);
1338 
1339     mysql_mutex_unlock(&c_rli->slave_worker_hash_lock);
1340   }
1341 
1342   curr_group_exec_parts.clear();
1343   curr_group_exec_parts.shrink_to_fit();
1344 
1345   if (error)
1346   {
1347     // Awakening Coordinator that could be waiting for entry release
1348     mysql_mutex_lock(&c_rli->slave_worker_hash_lock);
1349     mysql_cond_signal(&c_rli->slave_worker_hash_cond);
1350     mysql_mutex_unlock(&c_rli->slave_worker_hash_lock);
1351   }
1352   }
1353   else // not DB-type scheduler
1354   {
1355     assert(current_mts_submode->get_type() ==
1356            MTS_PARALLEL_TYPE_LOGICAL_CLOCK);
1357     /*
1358       Check if there're any waiter. If there're try incrementing lwm and
1359       signal to those who've got sasfied with the waiting condition.
1360 
1361       In a "good" "likely" execution branch the waiter set is expected
1362       to be empty. LWM is advanced by Coordinator asynchronously.
1363       Also lwm is advanced by a dependent Worker when it inserts its waiting
1364       request into the waiting list.
1365     */
1366     Mts_submode_logical_clock* mts_submode=
1367       static_cast<Mts_submode_logical_clock*>(c_rli->current_mts_submode);
1368     int64 min_child_waited_logical_ts=
1369       my_atomic_load64(&mts_submode->min_waited_timestamp);
1370 
1371     DBUG_EXECUTE_IF("slave_worker_ends_group_before_signal_lwm",
1372                     {
1373                       const char act[]= "now WAIT_FOR worker_continue";
1374                       assert(!debug_sync_set_action(current_thd,
1375                                                     STRING_WITH_LEN(act)));
1376                     });
1377 
1378     if (unlikely(error))
1379     {
1380       mysql_mutex_lock(&c_rli->mts_gaq_LOCK);
1381       mts_submode->is_error= true;
1382       if (mts_submode->min_waited_timestamp != SEQ_UNINIT)
1383         mysql_cond_signal(&c_rli->logical_clock_cond);
1384       mysql_mutex_unlock(&c_rli->mts_gaq_LOCK);
1385     }
1386     else if (min_child_waited_logical_ts != SEQ_UNINIT)
1387     {
1388       mysql_mutex_lock(&c_rli->mts_gaq_LOCK);
1389 
1390       /*
1391         min_child_waited_logical_ts may include an old value, so we need to
1392         check it again after getting the lock.
1393       */
1394       if (mts_submode->min_waited_timestamp != SEQ_UNINIT)
1395       {
1396         longlong curr_lwm= mts_submode->get_lwm_timestamp(c_rli, true);
1397 
1398         if (mts_submode->clock_leq(mts_submode->min_waited_timestamp, curr_lwm))
1399         {
1400           /*
1401             There's a transaction that depends on the current.
1402           */
1403           mysql_cond_signal(&c_rli->logical_clock_cond);
1404         }
1405       }
1406       mysql_mutex_unlock(&c_rli->mts_gaq_LOCK);
1407     }
1408 
1409 #ifndef NDEBUG
1410     curr_group_seen_sequence_number= false;
1411 #endif
1412   }
1413   curr_group_seen_gtid= curr_group_seen_begin= false;
1414 
1415   DBUG_VOID_RETURN;
1416 }
1417 
1418 
1419 /**
1420    two index comparision to determine which of the two
1421    is ordered first.
1422 
1423    @note   The caller makes sure the args are within the valid
1424            range, incl cases the queue is empty or full.
1425 
1426    @return TRUE  if the first arg identifies a queue entity ordered
1427                  after one defined by the 2nd arg,
1428            FALSE otherwise.
1429 */
1430 template <typename Element_type>
gt(ulong i,ulong k)1431 bool circular_buffer_queue<Element_type>::gt(ulong i, ulong k)
1432 {
1433   assert(i < size && k < size);
1434   assert(avail != entry);
1435 
1436   if (i >= entry)
1437     if (k >= entry)
1438       return i > k;
1439     else
1440       return FALSE;
1441   else
1442     if (k >= entry)
1443       return TRUE;
1444     else
1445       return i > k;
1446 }
1447 
1448 #ifndef NDEBUG
count_done(Relay_log_info * rli)1449 bool Slave_committed_queue::count_done(Relay_log_info* rli)
1450 {
1451   ulong i, k, cnt= 0;
1452 
1453   for (i= entry, k= 0; k < len; i= (i + 1) % size, k++)
1454   {
1455     Slave_job_group *ptr_g;
1456 
1457     ptr_g= &m_Q[i];
1458 
1459     if (ptr_g->worker_id != MTS_WORKER_UNDEF && ptr_g->done)
1460       cnt++;
1461   }
1462 
1463   assert(cnt <= size);
1464 
1465   DBUG_PRINT("mts", ("Checking if it can simulate a crash:"
1466              " mts_checkpoint_group %u counter %lu parallel slaves %lu\n",
1467              opt_mts_checkpoint_group, cnt, rli->slave_parallel_workers));
1468 
1469   return (cnt == (rli->slave_parallel_workers * opt_mts_checkpoint_group));
1470 }
1471 #endif
1472 
1473 
1474 /**
1475    The queue is processed from the head item by item
1476    to purge items representing committed groups.
1477    Progress in GAQ is assessed through comparision of GAQ index value
1478    with Worker's @c last_group_done_index.
1479    Purging breaks at a first discovered gap, that is an item
1480    that the assinged item->w_id'th Worker has not yet completed.
1481 
1482    The caller is supposed to be the checkpoint handler.
1483 
1484    A copy of the last discarded item containing
1485    the refreshed value of the committed low-water-mark is stored
1486    into @c lwm container member for further caller's processing.
1487    @c last_done is updated with the latest total_seqno for each Worker
1488    that was met during GAQ parse.
1489 
1490    @note dyn-allocated members of Slave_job_group such as
1491          group_relay_log_name as freed here.
1492 
1493    @return number of discarded items
1494 */
move_queue_head(Slave_worker_array * ws)1495 ulong Slave_committed_queue::move_queue_head(Slave_worker_array *ws)
1496 {
1497   DBUG_ENTER("Slave_committed_queue::move_queue_head");
1498   ulong i, cnt= 0;
1499 
1500   for (i= entry; i != avail && !empty(); cnt++, i= (i + 1) % size)
1501   {
1502     Slave_worker *w_i;
1503     Slave_job_group *ptr_g;
1504     char grl_name[FN_REFLEN];
1505 
1506 #ifndef NDEBUG
1507     if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
1508         cnt == opt_mts_checkpoint_period)
1509       DBUG_RETURN(cnt);
1510 #endif
1511 
1512     grl_name[0]= 0;
1513     ptr_g= &m_Q[i];
1514 
1515     /*
1516       The current job has not been processed or it was not
1517       even assigned, this means there is a gap.
1518     */
1519     if (ptr_g->worker_id == MTS_WORKER_UNDEF ||
1520         my_atomic_load32(&ptr_g->done) == 0)
1521       break; /* gap at i'th */
1522 
1523     /* Worker-id domain guard */
1524     compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS);
1525 
1526     w_i= ws->at(ptr_g->worker_id);
1527 
1528     /*
1529       Memorizes the latest valid group_relay_log_name.
1530     */
1531     if (ptr_g->group_relay_log_name)
1532     {
1533       strcpy(grl_name, ptr_g->group_relay_log_name);
1534       my_free(ptr_g->group_relay_log_name);
1535       /*
1536         It is important to mark the field as freed.
1537       */
1538       ptr_g->group_relay_log_name= NULL;
1539     }
1540 
1541     /*
1542       Removes the job from the (G)lobal (A)ssigned (Q)ueue.
1543     */
1544     Slave_job_group g= Slave_job_group();
1545 #ifndef NDEBUG
1546     ulong ind=
1547 #endif
1548       de_queue(&g);
1549 
1550     /*
1551       Stores the memorized name into the result struct. Note that we
1552       take care of the pointer first and then copy the other elements
1553       by assigning the structures.
1554     */
1555     if (grl_name[0] != 0)
1556     {
1557       strcpy(lwm.group_relay_log_name, grl_name);
1558     }
1559     g.group_relay_log_name= lwm.group_relay_log_name;
1560     lwm= g;
1561 
1562     assert(ind == i);
1563     assert(!ptr_g->group_relay_log_name);
1564     assert(ptr_g->total_seqno == lwm.total_seqno);
1565 #ifndef NDEBUG
1566     {
1567       ulonglong l= last_done[w_i->id];
1568       /*
1569         There must be some progress otherwise we should have
1570         exit the loop earlier.
1571       */
1572       assert(l < ptr_g->total_seqno);
1573     }
1574 #endif
1575     /*
1576       This is used to calculate the last time each worker has
1577       processed events.
1578     */
1579     last_done[w_i->id]= ptr_g->total_seqno;
1580   }
1581 
1582   assert(cnt <= size);
1583 
1584   DBUG_RETURN(cnt);
1585 }
1586 
1587 /**
1588    Finds low-water mark of committed jobs in GAQ.
1589    That is an index below which all jobs are marked as done.
1590 
1591    Notice the first available index is returned when the queue
1592    does not have any incomplete jobs. That includes cases of
1593    the empty and the full of complete jobs queue.
1594    A mutex protecting from concurrent LWM change by
1595    move_queue_head() (by Coordinator) should be taken by the caller.
1596 
1597    @param arg_g [out]  a double pointer to Slave job descriptor item
1598                        last marked with done-as-true boolean.
1599    @param start_index  a GAQ index to start/resume searching.
1600                        Caller is to make sure the index points into
1601                        assigned (occupied) range of circular buffer of GAQ.
1602    @return             GAQ index of the last consecutive done job, or the GAQ
1603                        size when none is found.
1604 */
find_lwm(Slave_job_group ** arg_g,ulong start_index)1605 ulong Slave_committed_queue::find_lwm(Slave_job_group** arg_g,
1606                                       ulong start_index)
1607 {
1608   Slave_job_group *ptr_g= NULL;
1609   ulong i, k, cnt;
1610 
1611   assert(start_index <= size);
1612 
1613   if (empty())
1614     return size;
1615 
1616   /*
1617     Loop continuation condition relies on
1618     (TODO: assert it)
1619     the start_index being in the running range:
1620 
1621        start_index \in [entry, avail - 1].
1622 
1623     It satisfies any queue size including 1.
1624     It does not satisfy the empty queue case which is bailed out earlier above.
1625   */
1626   for (i= start_index, cnt= 0; cnt < len - (start_index + size - entry) % size;
1627        i= (i + 1) % size, cnt++)
1628   {
1629     ptr_g= &m_Q[i];
1630     if (my_atomic_load32(&ptr_g->done) == 0)
1631     {
1632       if (cnt == 0)
1633         return size;             // the first node of the queue is not done
1634       break;
1635     }
1636   }
1637   ptr_g= &m_Q[k= (i + size - 1) % size];
1638   *arg_g= ptr_g;
1639 
1640   return k;
1641 }
1642 
1643 /**
1644    Method should be executed at slave system stop to
1645    cleanup dynamically allocated items that remained as unprocessed
1646    by Coordinator and Workers in their regular execution course.
1647 */
free_dynamic_items()1648 void Slave_committed_queue::free_dynamic_items()
1649 {
1650   ulong i, k;
1651   for (i= entry, k= 0; k < len; i= (i + 1) % size, k++)
1652   {
1653     Slave_job_group *ptr_g= &m_Q[i];
1654     if (ptr_g->group_relay_log_name)
1655     {
1656       my_free(ptr_g->group_relay_log_name);
1657     }
1658     if (ptr_g->checkpoint_log_name)
1659     {
1660       my_free(ptr_g->checkpoint_log_name);
1661     }
1662     if (ptr_g->checkpoint_relay_log_name)
1663     {
1664       my_free(ptr_g->checkpoint_relay_log_name);
1665     }
1666     if (ptr_g->group_master_log_name)
1667     {
1668       my_free(ptr_g->group_master_log_name);
1669     }
1670   }
1671   assert((avail == size /* full */ || entry == size /* empty */) ||
1672          i == avail /* all occupied are processed */);
1673 }
1674 
1675 
do_report(loglevel level,int err_code,const char * msg,va_list args) const1676 void Slave_worker::do_report(loglevel level, int err_code, const char *msg,
1677                              va_list args) const
1678 {
1679   char buff_coord[MAX_SLAVE_ERRMSG];
1680   char buff_gtid[Gtid::MAX_TEXT_LENGTH + 1];
1681   const char* log_name= const_cast<Slave_worker*>(this)->get_master_log_name();
1682   ulonglong log_pos= const_cast<Slave_worker*>(this)->get_master_log_pos();
1683   bool is_group_replication_applier_channel=
1684     channel_map.is_group_replication_channel_name(c_rli->get_channel(), true);
1685   const Gtid_specification *gtid_next= &info_thd->variables.gtid_next;
1686   THD *thd= info_thd;
1687 
1688   gtid_next->to_string(global_sid_map, buff_gtid, true);
1689 
1690   if (level == ERROR_LEVEL && (!has_temporary_error(thd, err_code) ||
1691       thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::SESSION)))
1692   {
1693     char coordinator_errmsg[MAX_SLAVE_ERRMSG];
1694     if (is_group_replication_applier_channel)
1695     {
1696       my_snprintf(coordinator_errmsg, MAX_SLAVE_ERRMSG,
1697                   "Coordinator stopped because there were error(s) in the "
1698                   "worker(s). "
1699                   "The most recent failure being: Worker %u failed executing "
1700                   "transaction '%s'. See error log and/or "
1701                   "performance_schema.replication_applier_status_by_worker "
1702                   "table for "
1703                   "more details about this failure or others, if any.",
1704                   internal_id, buff_gtid);
1705     }
1706     else
1707     {
1708       my_snprintf(coordinator_errmsg, MAX_SLAVE_ERRMSG,
1709                   "Coordinator stopped because there were error(s) in the "
1710                   "worker(s). "
1711                   "The most recent failure being: Worker %u failed executing "
1712                   "transaction '%s' at master log %s, end_log_pos %llu. "
1713                   "See error log and/or "
1714                   "performance_schema.replication_applier_status_by_worker "
1715                   "table for "
1716                   "more details about this failure or others, if any.",
1717                   internal_id, buff_gtid, log_name, log_pos);
1718     }
1719 
1720     /*
1721       We want to update the errors in coordinator as well as worker.
1722       The fill_coord_err_buf() function update the error number, message and
1723       timestamp fields. This function is different from va_report() as va_report()
1724       also logs the error message in the log apart from updating the error fields.
1725       So, the worker does the job of reporting the error in the log. We just make
1726       coordinator aware of the error.
1727     */
1728     c_rli->fill_coord_err_buf(level, err_code, coordinator_errmsg);
1729   }
1730 
1731   if (is_group_replication_applier_channel)
1732   {
1733     my_snprintf(buff_coord, sizeof(buff_coord),
1734                 "Worker %u failed executing transaction '%s'",
1735                 internal_id, buff_gtid);
1736   }
1737   else
1738   {
1739     my_snprintf(buff_coord, sizeof(buff_coord),
1740                 "Worker %u failed executing transaction '%s' at "
1741                 "master log %s, end_log_pos %llu",
1742                 internal_id, buff_gtid, log_name, log_pos);
1743   }
1744 
1745   /*
1746     Error reporting by the worker. The worker updates its error fields as well
1747     as reports the error in the log.
1748   */
1749   this->va_report(level, err_code, buff_coord, msg, args);
1750 }
1751 
operator new(size_t request)1752 void* Slave_worker::operator new(size_t request)
1753 {
1754   void* ptr;
1755   if (posix_memalign(&ptr, __alignof__(Slave_worker), sizeof(Slave_worker))) {
1756     throw std::bad_alloc();
1757   }
1758   return ptr;
1759 }
1760 
operator delete(void * ptr)1761 void Slave_worker::operator delete(void * ptr)
1762 {
1763   free(ptr);
1764 }
1765 
1766 #ifndef NDEBUG
may_have_timestamp(Log_event * ev)1767 static bool may_have_timestamp(Log_event *ev)
1768 {
1769   bool res= false;
1770 
1771   switch (ev->get_type_code())
1772   {
1773   case binary_log::QUERY_EVENT:
1774     res= true;
1775     break;
1776 
1777   case binary_log::GTID_LOG_EVENT:
1778     res= true;
1779     break;
1780 
1781   default:
1782     break;
1783   }
1784 
1785   return res;
1786 }
1787 
get_last_committed(Log_event * ev)1788 static int64 get_last_committed(Log_event *ev)
1789 {
1790   int64 res= SEQ_UNINIT;
1791 
1792   switch (ev->get_type_code())
1793   {
1794   case binary_log::GTID_LOG_EVENT:
1795     res= static_cast<Gtid_log_event*>(ev)->last_committed;
1796     break;
1797 
1798   default:
1799     break;
1800   }
1801 
1802   return res;
1803 }
1804 
get_sequence_number(Log_event * ev)1805 static int64 get_sequence_number(Log_event *ev)
1806 {
1807   int64 res= SEQ_UNINIT;
1808 
1809   switch (ev->get_type_code())
1810   {
1811   case binary_log::GTID_LOG_EVENT:
1812     res= static_cast<Gtid_log_event*>(ev)->sequence_number;
1813     break;
1814 
1815   default:
1816     break;
1817   }
1818 
1819   return res;
1820 }
1821 #endif
1822 
1823 /**
1824   MTS worker main routine.
1825   The worker thread loops in waiting for an event, executing it and
1826   fixing statistics counters.
1827 
1828   @param worker    a pointer to the assigned Worker struct
1829   @param rli       a pointer to Relay_log_info of Coordinator
1830                    to update statistics.
1831 
1832   @return 0 success
1833          -1 got killed or an error happened during appying
1834 */
slave_worker_exec_event(Log_event * ev)1835 int Slave_worker::slave_worker_exec_event(Log_event *ev)
1836 {
1837   Relay_log_info *rli= c_rli;
1838   THD *thd= info_thd;
1839   int ret= 0;
1840 
1841   DBUG_ENTER("slave_worker_exec_event");
1842 
1843   thd->server_id = ev->server_id;
1844   thd->set_time();
1845   thd->lex->set_current_select(0);
1846   if (!ev->common_header->when.tv_sec)
1847     ev->common_header->when.tv_sec= static_cast<long>(my_time(0));
1848   ev->thd= thd; // todo: assert because up to this point, ev->thd == 0
1849   ev->worker= this;
1850 
1851 #ifndef NDEBUG
1852   if (!is_mts_db_partitioned(rli) && may_have_timestamp(ev) &&
1853       !curr_group_seen_sequence_number)
1854   {
1855     curr_group_seen_sequence_number= true;
1856 
1857     longlong lwm_estimate= static_cast<Mts_submode_logical_clock*>
1858       (rli->current_mts_submode)->estimate_lwm_timestamp();
1859     int64 last_committed= get_last_committed(ev);
1860     int64 sequence_number= get_sequence_number(ev);
1861     /*
1862       The commit timestamp waiting condition:
1863 
1864         lwm_estimate < last_committed  <=>  last_committed  \not <= lwm_estimate
1865 
1866       must have been satisfied by Coordinator.
1867       The first scheduled transaction does not have to wait for anybody.
1868     */
1869     assert(rli->gaq->entry == ev->mts_group_idx ||
1870            Mts_submode_logical_clock::clock_leq(last_committed,
1871                                                 lwm_estimate));
1872     assert(lwm_estimate != SEQ_UNINIT || rli->gaq->entry == ev->mts_group_idx);
1873     /*
1874       The current transaction's timestamp can't be less that lwm.
1875     */
1876     assert(sequence_number == SEQ_UNINIT ||
1877            !Mts_submode_logical_clock::
1878            clock_leq(sequence_number,
1879                      static_cast<Mts_submode_logical_clock*>
1880                      (rli->current_mts_submode)->
1881                      estimate_lwm_timestamp()));
1882   }
1883 #endif
1884 
1885   // Address partioning only in database mode
1886   if (!is_gtid_event(ev) && is_mts_db_partitioned(rli))
1887   {
1888     if (ev->contains_partition_info(end_group_sets_max_dbs))
1889     {
1890       uint num_dbs= ev->mts_number_dbs();
1891 
1892       if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
1893         num_dbs= 1;
1894 
1895       assert(num_dbs > 0);
1896 
1897       for (uint k= 0; k < num_dbs; k++)
1898       {
1899         bool found= false;
1900 
1901         for (size_t i= 0; i < curr_group_exec_parts.size() && !found; i++)
1902         {
1903           found= curr_group_exec_parts[i] ==
1904             ev->mts_assigned_partitions[k];
1905         }
1906         if (!found)
1907         {
1908           /*
1909             notice, can't assert
1910             assert(ev->mts_assigned_partitions[k]->worker == worker);
1911             since entry could be marked as wanted by other worker.
1912           */
1913           curr_group_exec_parts.push_back(ev->mts_assigned_partitions[k]);
1914         }
1915       }
1916       end_group_sets_max_dbs= false;
1917     }
1918   }
1919 
1920   set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
1921   set_master_log_pos(static_cast<ulong>(ev->common_header->log_pos));
1922   set_gaq_index(ev->mts_group_idx);
1923   ret= ev->do_apply_event_worker(this);
1924 
1925   DBUG_EXECUTE_IF("after_executed_write_rows_event", {
1926     if (ev->get_type_code() == binary_log::WRITE_ROWS_EVENT) {
1927       static const char act[]= "now signal executed";
1928       assert(opt_debug_sync_timeout > 0);
1929       assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
1930     }
1931   };);
1932 
1933   DBUG_RETURN(ret);
1934 }
1935 
1936 /**
1937   Sleep for a given amount of seconds or until killed.
1938 
1939   @param seconds    The number of seconds to sleep.
1940 
1941   @retval True if the thread has been killed, false otherwise.
1942 */
1943 
worker_sleep(ulong seconds)1944 bool Slave_worker::worker_sleep(ulong seconds)
1945 {
1946   bool ret= false;
1947   struct timespec abstime;
1948   mysql_mutex_t *lock= &jobs_lock;
1949   mysql_cond_t *cond= &jobs_cond;
1950 
1951   /* Absolute system time at which the sleep time expires. */
1952   set_timespec(&abstime, seconds);
1953 
1954   mysql_mutex_lock(lock);
1955   info_thd->ENTER_COND(cond, lock, NULL, NULL);
1956 
1957   while (!(ret= info_thd->killed || running_status != RUNNING))
1958   {
1959     int error= mysql_cond_timedwait(cond, lock, &abstime);
1960     if (error == ETIMEDOUT || error == ETIME)
1961       break;
1962   }
1963 
1964   mysql_mutex_unlock(lock);
1965   info_thd->EXIT_COND(NULL);
1966   return ret;
1967 }
1968 
prepare_for_retry(Log_event & event)1969 void Slave_worker::prepare_for_retry(Log_event &event) {
1970   if (event.get_type_code() ==
1971       binary_log::ROWS_QUERY_LOG_EVENT)  // If a `Rows_query_log_event`, let the
1972                                          // event be disposed in the main worker
1973                                          // loop.
1974   {
1975     event.worker= this;
1976     this->rows_query_ev= NULL;
1977   }
1978 }
1979 
1980 /**
1981   It is called after an error happens. It checks if that is an temporary
1982   error and if the situation is allow to retry the transaction. Then it will
1983   retry the transaction if it is allowed. Retry policy and logic is similar to
1984   single-threaded slave.
1985 
1986   @param[in] start_relay_number The extension number of the relay log which
1987                includes the first event of the transaction.
1988   @param[in] start_relay_pos The offset of the transaction's first event.
1989 
1990   @param[in] end_relay_number The extension number of the relay log which
1991                includes the last event it should retry.
1992   @param[in] end_relay_pos The offset of the last event it should retry.
1993 
1994   @return false if succeeds, otherwise returns true.
1995 */
retry_transaction(uint start_relay_number,my_off_t start_relay_pos,uint end_relay_number,my_off_t end_relay_pos)1996 bool Slave_worker::retry_transaction(uint start_relay_number,
1997                                      my_off_t start_relay_pos,
1998                                      uint end_relay_number,
1999                                      my_off_t end_relay_pos)
2000 {
2001   THD *thd= info_thd;
2002   bool silent= false;
2003   Slave_worker::Retry_context_sentry cleaned_up(*this);
2004 
2005   DBUG_ENTER("Slave_worker::retry_transaction");
2006 
2007   if (slave_trans_retries == 0)
2008     DBUG_RETURN(true);
2009 
2010   do
2011   {
2012     /* Simulate a lock deadlock error */
2013     uint error= 0;
2014     cleaned_up= false;
2015 
2016     if (found_order_commit_deadlock())
2017     {
2018       /*
2019         This transaction was allowed to be executed in parallel with other that
2020         happened earlier according to binary log order. It was asked to be
2021         rolled back by the other transaction as it was holding a lock that is
2022         needed by the other transaction to progress, according to binary log
2023         order this configure a deadlock.
2024 
2025         At this point, this transaction *should* have no non-temporary errors.
2026 
2027         Having a non-temporary error may be a sign of:
2028 
2029         a) Slave has diverged from the master;
2030         b) There is an issue in the logical clock allowing a transaction to be
2031            applied in parallel with its dependencies (the two transactions are
2032            trying to change the same record in parallel).
2033 
2034         For (a), a retry of this transaction will produce the same error. For
2035         (b), this transaction might succeed upon retry, allowing the slave to
2036         progress without manual intervention, but it is a sign of problems in LC
2037         generation at the master.
2038 
2039         So, we will make the worker to retry this transaction only if there is
2040         no error or the error is a temporary error.
2041       */
2042       Diagnostics_area *da= thd->get_stmt_da();
2043       if (!da->is_error() ||
2044           has_temporary_error(thd, da->is_error() ? da->mysql_errno() : error,
2045                               &silent))
2046       {
2047         error= ER_LOCK_DEADLOCK;
2048         DBUG_EXECUTE_IF("simulate_exhausted_trans_retries",
2049                         { trans_retries = slave_trans_retries; };);
2050       }
2051     }
2052 
2053     if ((!has_temporary_error(thd, error, &silent) ||
2054          thd->get_transaction()->cannot_safely_rollback(
2055              Transaction_ctx::SESSION)) &&
2056         DBUG_EVALUATE_IF("error_on_rows_query_event_apply", false, true))
2057       DBUG_RETURN(true);
2058 
2059     if (trans_retries >= slave_trans_retries)
2060     {
2061       thd->is_fatal_error= 1;
2062       c_rli->report(
2063           ERROR_LEVEL,
2064           thd->is_error() ? thd->get_stmt_da()->mysql_errno() : error,
2065           "worker thread retried transaction %lu time(s) "
2066           "in vain, giving up. Consider raising the value of "
2067           "the slave_transaction_retries variable.", trans_retries);
2068       DBUG_RETURN(true);
2069     }
2070 
2071     DBUG_EXECUTE_IF("error_on_rows_query_event_apply", {
2072       if (c_rli->retried_trans == 2)
2073       {
2074         DBUG_SET("-d,error_on_rows_query_event_apply");
2075         std::string act("now SIGNAL end_retries_on_rows_query_event_apply");
2076         assert(!debug_sync_set_action(thd, act.data(), act.length()));
2077       }
2078       silent= true;
2079     };);
2080 
2081     if (!silent)
2082       trans_retries++;
2083 
2084     mysql_mutex_lock(&c_rli->data_lock);
2085     c_rli->retried_trans++;
2086     mysql_mutex_unlock(&c_rli->data_lock);
2087 
2088     cleaned_up.clean();
2089     worker_sleep(min<ulong>(trans_retries, MAX_SLAVE_RETRY_PAUSE));
2090 
2091   } while (read_and_apply_events(start_relay_number, start_relay_pos,
2092                                  end_relay_number, end_relay_pos));
2093   DBUG_RETURN(false);
2094 }
2095 
2096 /**
2097   Read events from relay logs and apply them.
2098 
2099   @param[in] start_relay_number The extension number of the relay log which
2100                includes the first event of the transaction.
2101   @param[in] start_relay_pos The offset of the transaction's first event.
2102 
2103   @param[in] end_relay_number The extension number of the relay log which
2104                includes the last event it should retry.
2105   @param[in] end_relay_pos The offset of the last event it should retry.
2106 
2107   @return false if succeeds, otherwise returns true.
2108 */
read_and_apply_events(uint start_relay_number,my_off_t start_relay_pos,uint end_relay_number,my_off_t end_relay_pos)2109 bool Slave_worker::read_and_apply_events(uint start_relay_number,
2110                                          my_off_t start_relay_pos,
2111                                          uint end_relay_number,
2112                                          my_off_t end_relay_pos)
2113 {
2114   DBUG_ENTER("Slave_worker::read_and_apply_events");
2115 
2116   Relay_log_info *rli= c_rli;
2117   IO_CACHE relay_io;
2118   char file_name[FN_REFLEN+1];
2119   uint file_number= start_relay_number;
2120   bool error= true;
2121   bool arrive_end= false;
2122 
2123   relay_log_number_to_name(start_relay_number, file_name);
2124 
2125   memset(&relay_io, 0, sizeof(IO_CACHE));
2126 
2127   while (!arrive_end)
2128   {
2129     Log_event *ev= NULL;
2130 
2131     if (!my_b_inited(&relay_io))
2132     {
2133       const char *errmsg;
2134 
2135       DBUG_PRINT("info", ("Open relay log %s", file_name));
2136 
2137       if (open_binlog_file(&relay_io, file_name, &errmsg) == -1)
2138       {
2139         sql_print_error("Failed to open relay log %s, error: %s", file_name,
2140                         errmsg);
2141         goto end;
2142       }
2143       // Search for Start_encryption_event. When relay log is encrypted the second
2144       // event (after Format_description_event) will be Start_encryption_event.
2145       for (uint i= 0; i < 2; i++)
2146       {
2147         ev= Log_event::read_log_event(&relay_io, NULL,
2148                                       rli->get_rli_description_event(),
2149                                       opt_slave_sql_verify_checksum);
2150 
2151         if (ev != NULL)
2152         {
2153           if (ev->get_type_code() == binary_log::START_ENCRYPTION_EVENT &&
2154               !rli->get_rli_description_event()->start_decryption(static_cast<Start_encryption_log_event*>(ev)))
2155           {
2156             delete ev;
2157             goto end;
2158             error= true;
2159           }
2160           delete ev;
2161         }
2162       }
2163       my_b_seek(&relay_io, start_relay_pos);
2164     }
2165 
2166     /* If it is the last event, then set arrive_end as true */
2167     arrive_end= (my_b_tell(&relay_io) == end_relay_pos &&
2168                  file_number == end_relay_number);
2169 
2170     ev= Log_event::read_log_event(&relay_io, NULL,
2171                                   rli->get_rli_description_event(),
2172                                   opt_slave_sql_verify_checksum);
2173     if (ev != NULL)
2174     {
2175       /* It is a event belongs to the transaction */
2176       if (!ev->is_mts_sequential_exec(rli->current_mts_submode->get_type() ==
2177                                       MTS_PARALLEL_TYPE_DB_NAME))
2178       {
2179         int ret= 0;
2180 
2181         ev->future_event_relay_log_pos= my_b_tell(&relay_io);
2182         ev->mts_group_idx= gaq_index;
2183 
2184         if (is_mts_db_partitioned(rli) && ev->contains_partition_info(true))
2185           assign_partition_db(ev);
2186 
2187         ret= slave_worker_exec_event(ev);
2188         if (ev->worker != NULL)
2189         {
2190           delete ev;
2191           ev= NULL;
2192         }
2193 
2194         if (ret != 0)
2195           goto end;
2196       }
2197       else
2198       {
2199         /*
2200           It is a Rotate_log_event, Format_description_log_event event or other
2201           type event doesn't belong to the transaction.
2202         */
2203         delete ev;
2204         ev= NULL;
2205       }
2206     }
2207     else
2208     {
2209       /*
2210         IO error happens if relay_io.error != 0, otherwise it arrives the
2211         end of the relay log
2212       */
2213       if (relay_io.error != 0)
2214       {
2215         sql_print_error("Error when worker read relay log events,"
2216                         "relay log name %s, position %llu",
2217                         rli->get_event_relay_log_name(), my_b_tell(&relay_io));
2218         goto end;
2219       }
2220 
2221       if (rli->relay_log.find_next_relay_log(file_name))
2222       {
2223         sql_print_error("Failed to find next relay log when retrying the "
2224                         "transaction, current relay log is %s", file_name);
2225         goto end;
2226       }
2227 
2228       file_number= relay_log_name_to_number(file_name);
2229 
2230       end_io_cache(&relay_io);
2231       mysql_file_close(relay_io.file, MYF(0));
2232       start_relay_pos= BIN_LOG_HEADER_SIZE;
2233     }
2234   }
2235 
2236   error= false;
2237 end:
2238   if (my_b_inited(&relay_io))
2239   {
2240     end_io_cache(&relay_io);
2241     mysql_file_close(relay_io.file, MYF(0));
2242   }
2243   DBUG_RETURN(error);
2244 }
2245 
2246 /*
2247   Find database entry from map_db_to_worker hash table.
2248  */
find_entry_from_db_map(const char * dbname,Relay_log_info * rli)2249 static db_worker_hash_entry *find_entry_from_db_map(const char *dbname,
2250                                                     Relay_log_info *rli)
2251 {
2252   db_worker_hash_entry *entry= NULL;
2253   my_hash_value_type hash_value;
2254   uchar dblength= (uint) strlen(dbname);
2255 
2256   hash_value= my_calc_hash(&rli->mapping_db_to_worker, (const uchar*) dbname,
2257                            dblength);
2258 
2259   mysql_mutex_lock(&rli->slave_worker_hash_lock);
2260 
2261   entry= (db_worker_hash_entry *)
2262     my_hash_search_using_hash_value(&rli->mapping_db_to_worker, hash_value,
2263                                     (uchar*) dbname, dblength);
2264 
2265   mysql_mutex_unlock(&rli->slave_worker_hash_lock);
2266   return entry;
2267 }
2268 
2269 /*
2270   Initialize Log_event::mts_assigned_partitions array. It is for transaction
2271   retry and is only called when retrying a transaction by workers.
2272 */
assign_partition_db(Log_event * ev)2273 void Slave_worker::assign_partition_db(Log_event *ev)
2274 {
2275   Mts_db_names mts_dbs;
2276   int i;
2277 
2278   ev->get_mts_dbs(&mts_dbs);
2279 
2280   if (mts_dbs.num == OVER_MAX_DBS_IN_EVENT_MTS)
2281     ev->mts_assigned_partitions[0]= find_entry_from_db_map("", c_rli);
2282   else
2283     for (i= 0; i < mts_dbs.num; i++)
2284       ev->mts_assigned_partitions[i]= find_entry_from_db_map(mts_dbs.name[i],
2285                                                              c_rli);
2286 }
2287 
2288 // returns the next available! (TODO: incompatible to circurla_buff method!!!)
en_queue(Slave_jobs_queue * jobs,Slave_job_item * item)2289 static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
2290 {
2291   if (jobs->avail == jobs->size)
2292   {
2293     assert(jobs->avail == jobs->m_Q.size());
2294     return -1;
2295   }
2296 
2297   // store
2298 
2299   jobs->m_Q[jobs->avail]= *item;
2300 
2301   // pre-boundary cond
2302   if (jobs->entry == jobs->size)
2303     jobs->entry= jobs->avail;
2304 
2305   jobs->avail= (jobs->avail + 1) % jobs->size;
2306   jobs->len++;
2307 
2308   // post-boundary cond
2309   if (jobs->avail == jobs->entry)
2310     jobs->avail= jobs->size;
2311   assert(jobs->avail == jobs->entry ||
2312          jobs->len == (jobs->avail >= jobs->entry) ?
2313          (jobs->avail - jobs->entry) : (jobs->size + jobs->avail - jobs->entry));
2314   return jobs->avail;
2315 }
2316 
Retry_context_sentry(Slave_worker & parent)2317 Slave_worker::Retry_context_sentry::Retry_context_sentry(Slave_worker &parent)
2318     : m_parent(parent), m_is_cleaned_up(false) {}
2319 
~Retry_context_sentry()2320 Slave_worker::Retry_context_sentry::~Retry_context_sentry() {
2321   this->clean();
2322 }
2323 
operator =(bool is_cleaned_up)2324 Slave_worker::Retry_context_sentry &Slave_worker::Retry_context_sentry::operator=(
2325     bool is_cleaned_up)
2326 {
2327   this->m_is_cleaned_up = is_cleaned_up;
2328   return (*this);
2329 }
2330 
clean()2331 void Slave_worker::Retry_context_sentry::clean() {
2332   if (!this->m_is_cleaned_up) {
2333     this->m_parent.cleanup_context(this->m_parent.info_thd, 1);
2334     this->m_parent.reset_order_commit_deadlock();
2335     this->m_is_cleaned_up = true;
2336   }
2337 }
2338 
2339 /**
2340    return the value of @c data member of the head of the queue.
2341 */
head_queue(Slave_jobs_queue * jobs,Slave_job_item * ret)2342 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
2343 {
2344   if (jobs->entry == jobs->size)
2345   {
2346     assert(jobs->len == 0);
2347     ret->data= NULL;               // todo: move to caller
2348     return NULL;
2349   }
2350   *ret= jobs->m_Q[jobs->entry];
2351 
2352   assert(ret->data);         // todo: move to caller
2353 
2354   return ret;
2355 }
2356 
2357 
2358 /**
2359    return a job item through a struct which point is supplied via argument.
2360 */
de_queue(Slave_jobs_queue * jobs,Slave_job_item * ret)2361 Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
2362 {
2363   if (jobs->entry == jobs->size)
2364   {
2365     assert(jobs->len == 0);
2366     return NULL;
2367   }
2368   *ret= jobs->m_Q[jobs->entry];
2369   jobs->len--;
2370 
2371   // pre boundary cond
2372   if (jobs->avail == jobs->size)
2373     jobs->avail= jobs->entry;
2374   jobs->entry= (jobs->entry + 1) % jobs->size;
2375 
2376   // post boundary cond
2377   if (jobs->avail == jobs->entry)
2378     jobs->entry= jobs->size;
2379 
2380   assert(jobs->entry == jobs->size ||
2381          (jobs->len == (jobs->avail >= jobs->entry) ?
2382           (jobs->avail - jobs->entry) :
2383           (jobs->size + jobs->avail - jobs->entry)));
2384 
2385   return ret;
2386 }
2387 
2388 /**
2389    Coordinator enqueues a job item into a Worker private queue.
2390 
2391    @param job_item  a pointer to struct carrying a reference to an event
2392    @param worker    a pointer to the assigned Worker struct
2393    @param rli       a pointer to Relay_log_info of Coordinator
2394 
2395    @return false Success.
2396            true  Thread killed or worker stopped while waiting for
2397                  successful enqueue.
2398 */
append_item_to_jobs(slave_job_item * job_item,Slave_worker * worker,Relay_log_info * rli)2399 bool append_item_to_jobs(slave_job_item *job_item,
2400                          Slave_worker *worker, Relay_log_info *rli)
2401 {
2402   THD *thd= rli->info_thd;
2403   int ret= -1;
2404   size_t ev_size= job_item->data->common_header->data_written;
2405   ulonglong new_pend_size;
2406   PSI_stage_info old_stage;
2407 
2408   assert(thd == current_thd);
2409 
2410   mysql_mutex_lock(&rli->pending_jobs_lock);
2411   new_pend_size= rli->mts_pending_jobs_size + ev_size;
2412   bool big_event= (ev_size > rli->mts_pending_jobs_size_max);
2413   /*
2414     C waits basing on *data* sizes in the queues.
2415     If it is a big event (event size is greater than
2416     slave_pending_jobs_size_max but less than slave_max_allowed_packet),
2417     it will wait for all the jobs in the workers's queue to be
2418     completed. If it is normal event (event size is less than
2419     slave_pending_jobs_size_max), then it will wait for
2420     enough empty memory to keep the event in one of the workers's
2421     queue.
2422     NOTE: Receiver thread (I/O thread) is taking care of restricting
2423     the event size to slave_max_allowed_packet. If an event from
2424     the master is bigger than this value, IO thread will be stopped
2425     with error ER_NET_PACKET_TOO_LARGE.
2426   */
2427   while ( (!big_event && new_pend_size > rli->mts_pending_jobs_size_max)
2428           || (big_event && rli->mts_pending_jobs_size != 0 ))
2429   {
2430     rli->mts_wq_oversize= TRUE;
2431     rli->wq_size_waits_cnt++; // waiting due to the total size
2432     thd->ENTER_COND(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
2433                     &stage_slave_waiting_worker_to_free_events, &old_stage);
2434     mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
2435     mysql_mutex_unlock(&rli->pending_jobs_lock);
2436     thd->EXIT_COND(&old_stage);
2437     if (thd->killed)
2438       return true;
2439     if (rli->wq_size_waits_cnt % 10 == 1)
2440       sql_print_information("Multi-threaded slave: Coordinator has waited "
2441                             "%lu times hitting slave_pending_jobs_size_max; "
2442                             "current event size = %zu.",
2443                             rli->wq_size_waits_cnt, ev_size);
2444     mysql_mutex_lock(&rli->pending_jobs_lock);
2445 
2446     new_pend_size= rli->mts_pending_jobs_size + ev_size;
2447   }
2448   rli->pending_jobs++;
2449   rli->mts_pending_jobs_size= new_pend_size;
2450   rli->mts_events_assigned++;
2451 
2452   mysql_mutex_unlock(&rli->pending_jobs_lock);
2453 
2454   /*
2455     Sleep unless there is an underrunning Worker and the current Worker
2456     queue is empty or filled lightly (not more than underrun level).
2457   */
2458   if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF &&
2459       worker->jobs.len > worker->underrun_level)
2460   {
2461     /*
2462       todo: experiment with weight to get a good approximation formula.
2463       Max possible nap time is choosen 1 ms.
2464       The bigger the excessive overrun counter the longer the nap.
2465     */
2466     ulong nap_weight= rli->mts_wq_excess_cnt + 1;
2467     /*
2468        Nap time is a product of a weight factor and the basic nap unit.
2469        The weight factor is proportional to the worker queues overrun excess
2470        counter. For example when there were only one overruning Worker
2471        the max nap_weight as 0.1 * worker->jobs.size would be
2472        about 1600 so the max nap time is approx 0.008 secs.
2473        Such value is not reachable because of min().
2474        Notice, granularity of sleep depends on the resolution of the software
2475        clock, High-Resolution Timer (HRT) configuration. Without HRT
2476        the precision of wake-up through @c select() may be greater or
2477        equal 1 ms. So don't expect the nap last a prescribed fraction of 1 ms
2478        in such case.
2479     */
2480     my_sleep(min<ulong>(1000, nap_weight * rli->mts_coordinator_basic_nap));
2481     rli->mts_wq_no_underrun_cnt++;
2482   }
2483 
2484   mysql_mutex_lock(&worker->jobs_lock);
2485 
2486   // possible WQ overfill
2487   while (worker->running_status == Slave_worker::RUNNING && !thd->killed &&
2488          (ret= en_queue(&worker->jobs, job_item)) == -1)
2489   {
2490     thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
2491                     &stage_slave_waiting_worker_queue, &old_stage);
2492     worker->jobs.overfill= TRUE;
2493     worker->jobs.waited_overfill++;
2494     rli->mts_wq_overfill_cnt++;
2495     mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
2496     mysql_mutex_unlock(&worker->jobs_lock);
2497     thd->EXIT_COND(&old_stage);
2498 
2499     mysql_mutex_lock(&worker->jobs_lock);
2500   }
2501   if (ret != -1)
2502   {
2503     worker->curr_jobs++;
2504     if (worker->jobs.len == 1)
2505       mysql_cond_signal(&worker->jobs_cond);
2506 
2507     mysql_mutex_unlock(&worker->jobs_lock);
2508   }
2509   else
2510   {
2511     mysql_mutex_unlock(&worker->jobs_lock);
2512 
2513     mysql_mutex_lock(&rli->pending_jobs_lock);
2514     rli->pending_jobs--;                  // roll back of the prev incr
2515     rli->mts_pending_jobs_size -= ev_size;
2516     mysql_mutex_unlock(&rli->pending_jobs_lock);
2517   }
2518 
2519   return (-1 != ret ? false : true);
2520 }
2521 
2522 /**
2523   Remove a job item from the given workers job queue. It also updates related
2524   status.
2525 
2526   param[in] job_item The job item will be removed
2527   param[in] worker   The worker which job_item belongs to.
2528   param[in] rli      slave's relay log info object.
2529  */
remove_item_from_jobs(slave_job_item * job_item,Slave_worker * worker,Relay_log_info * rli)2530 static void remove_item_from_jobs(slave_job_item *job_item,
2531                                   Slave_worker *worker, Relay_log_info *rli)
2532 {
2533   Log_event *ev= job_item->data;
2534 
2535   mysql_mutex_lock(&worker->jobs_lock);
2536   de_queue(&worker->jobs, job_item);
2537   /* possible overfill */
2538   if (worker->jobs.len == worker->jobs.size - 1 &&
2539       worker->jobs.overfill == TRUE)
2540   {
2541     worker->jobs.overfill= false;
2542     // todo: worker->hungry_cnt++;
2543     mysql_cond_signal(&worker->jobs_cond);
2544   }
2545   mysql_mutex_unlock(&worker->jobs_lock);
2546 
2547   /* statistics */
2548 
2549   /* todo: convert to rwlock/atomic write */
2550   mysql_mutex_lock(&rli->pending_jobs_lock);
2551 
2552   rli->pending_jobs--;
2553   rli->mts_pending_jobs_size-= ev->common_header->data_written;
2554   assert(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
2555 
2556   /*
2557     The positive branch is underrun: number of pending assignments
2558     is less than underrun level.
2559     Zero of jobs.len has to reset underrun w_id as the worker may get
2560     the next piece of assignement in a long time.
2561   */
2562   if (worker->underrun_level > worker->jobs.len && worker->jobs.len != 0)
2563   {
2564     rli->mts_wq_underrun_w_id= worker->id;
2565   } else if (rli->mts_wq_underrun_w_id == worker->id)
2566   {
2567     // reset only own marking
2568     rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
2569   }
2570 
2571   /*
2572     Overrun handling.
2573     Incrementing the Worker private and the total excess counter corresponding
2574     to number of events filled above the overrun_level.
2575     The increment amount to the total counter is a difference between
2576     the current and the previous private excess (worker->wq_overrun_cnt).
2577     When the current queue length drops below overrun_level the global
2578     counter is decremented, the local is reset.
2579   */
2580   if (worker->overrun_level < worker->jobs.len)
2581   {
2582     ulong last_overrun= worker->wq_overrun_cnt;
2583     ulong excess_delta;
2584 
2585     /* current overrun */
2586     worker->wq_overrun_cnt= worker->jobs.len - worker->overrun_level;
2587     excess_delta= worker->wq_overrun_cnt - last_overrun;
2588     worker->excess_cnt+= excess_delta;
2589     rli->mts_wq_excess_cnt+= excess_delta;
2590     rli->mts_wq_overrun_cnt++;  // statistics
2591 
2592     // guarding correctness of incrementing in case of the only one Worker
2593     assert(rli->workers.size() != 1 ||
2594            rli->mts_wq_excess_cnt == worker->wq_overrun_cnt);
2595   }
2596   else if (worker->excess_cnt > 0)
2597   {
2598     // When level drops below the total excess is decremented by the
2599     // value of the worker's contribution to the total excess.
2600     rli->mts_wq_excess_cnt-= worker->excess_cnt;
2601     worker->excess_cnt= 0;
2602     worker->wq_overrun_cnt= 0; // and the local is reset
2603 
2604     assert(rli->mts_wq_excess_cnt >= 0);
2605     assert(rli->mts_wq_excess_cnt == 0 || rli->workers.size() > 1);
2606 
2607   }
2608 
2609   /* coordinator can be waiting */
2610   if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
2611       rli->mts_wq_oversize)  // TODO: unit/general test wq_oversize
2612   {
2613     rli->mts_wq_oversize= FALSE;
2614     mysql_cond_signal(&rli->pending_jobs_cond);
2615   }
2616 
2617   mysql_mutex_unlock(&rli->pending_jobs_lock);
2618 
2619   worker->events_done++;
2620 }
2621 /**
2622    Worker's routine to wait for a new assignement through
2623    @c append_item_to_jobs()
2624 
2625    @param worker    a pointer to the waiting Worker struct
2626    @param job_item  a pointer to struct carrying a reference to an event
2627 
2628    @return NULL failure or
2629            a-pointer to an item.
2630 */
pop_jobs_item(Slave_worker * worker,Slave_job_item * job_item)2631 struct slave_job_item* pop_jobs_item(Slave_worker *worker,
2632                                      Slave_job_item *job_item)
2633 {
2634   THD *thd= worker->info_thd;
2635 
2636   mysql_mutex_lock(&worker->jobs_lock);
2637 
2638   job_item->data= NULL;
2639   while (!job_item->data && !thd->killed &&
2640          (worker->running_status == Slave_worker::RUNNING ||
2641           worker->running_status == Slave_worker::STOP))
2642   {
2643     PSI_stage_info old_stage;
2644 
2645     if (set_max_updated_index_on_stop(worker, job_item))
2646       break;
2647     if (job_item->data == NULL)
2648     {
2649       worker->wq_empty_waits++;
2650       thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
2651                                &stage_slave_waiting_event_from_coordinator,
2652                                &old_stage);
2653       mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
2654       mysql_mutex_unlock(&worker->jobs_lock);
2655       thd->EXIT_COND(&old_stage);
2656       mysql_mutex_lock(&worker->jobs_lock);
2657     }
2658   }
2659   if (job_item->data)
2660     worker->curr_jobs--;
2661 
2662   mysql_mutex_unlock(&worker->jobs_lock);
2663 
2664   thd_proc_info(worker->info_thd, "Executing event");
2665   return job_item;
2666 }
2667 
2668 /**
2669   Report a not yet reported error to the coordinator if necessary.
2670 
2671   All issues detected when applying binary log events are reported using
2672   rli->report(), but when an issue is not reported by the log event being
2673   applied, there is a workaround at handle_slave_sql() to report the issue
2674   also using rli->report() for the STS applier (or the MTS coordinator).
2675 
2676   This function implements the workaround for a MTS worker.
2677 
2678   @param worker the worker to be evaluated.
2679 */
report_error_to_coordinator(Slave_worker * worker)2680 void report_error_to_coordinator(Slave_worker *worker)
2681 {
2682   THD *thd= worker->info_thd;
2683   /*
2684     It is possible that the worker had failed to apply the event but
2685     did not reported about the failure using rli->report(). An example
2686     of such cases are failures caused by setting GTID_NEXT variable with
2687     an unsupported GTID mode (GTID_SET when GTID_MODE = OFF, anonymous
2688     GTID when GTID_MODE = ON).
2689   */
2690   if (thd->is_error())
2691   {
2692     char const *const errmsg= thd->get_stmt_da()->message_text();
2693     DBUG_PRINT("info",
2694                ("thd->get_stmt_da()->get_mysql_errno()=%d; "
2695                 "worker->last_error.number=%d",
2696                 thd->get_stmt_da()->mysql_errno(),
2697                 worker->last_error().number));
2698 
2699     if (worker->last_error().number == 0 &&
2700         /*
2701           When another worker that should commit before the current worker
2702           being evaluated has failed and the commit order should be preserved
2703           the current worker was asked to roll back and would stop with the
2704           ER_SLAVE_WORKER_STOPPED_PREVIOUS_THD_ERROR not yet reported to the
2705           coordinator. Reporting this error to the coordinator would be a
2706           mistake and would mask the real issue that lead to the MTS stop as
2707           the coordinator reports only the last error reported to it as the
2708           cause of the MTS failure.
2709 
2710           So, we should skip reporting the error if it was reported because
2711           the current transaction had to be rolled back by a failure in a
2712           previous transaction in the commit order while the current
2713           transaction was waiting to be committed.
2714         */
2715         thd->get_stmt_da()->mysql_errno() !=
2716         ER_SLAVE_WORKER_STOPPED_PREVIOUS_THD_ERROR)
2717     {
2718       /*
2719         This function is reporting an error which was not reported
2720         while executing exec_relay_log_event().
2721       */
2722       worker->report(ERROR_LEVEL, thd->get_stmt_da()->mysql_errno(),
2723                      "%s", errmsg);
2724     }
2725   }
2726 }
2727 
2728 /**
2729   apply one job group.
2730 
2731   @note the function maintains worker's CGEP and modifies APH, updates
2732         the current group item in GAQ via @c slave_worker_ends_group().
2733 
2734   param[in] worker the worker which calls it.
2735   param[in] rli    slave's relay log info object.
2736 
2737   return returns 0 if the group of jobs are applied successfully, otherwise
2738          returns an error code.
2739  */
slave_worker_exec_job_group(Slave_worker * worker,Relay_log_info * rli)2740 int slave_worker_exec_job_group(Slave_worker *worker, Relay_log_info *rli)
2741 {
2742   struct slave_job_item item= {NULL, 0, 0};
2743   struct slave_job_item *job_item= &item;
2744   THD *thd= worker->info_thd;
2745   bool seen_gtid= false;
2746   bool seen_begin= false;
2747   int error= 0;
2748   Log_event *ev= NULL;
2749   uint start_relay_number;
2750   my_off_t start_relay_pos;
2751 
2752   DBUG_ENTER("slave_worker_exec_job_group");
2753 
2754   if (unlikely(worker->trans_retries > 0))
2755     worker->trans_retries= 0;
2756 
2757   job_item= pop_jobs_item(worker, job_item);
2758   start_relay_number= job_item->relay_number;
2759   start_relay_pos= job_item->relay_pos;
2760 
2761   while (1)
2762   {
2763     Slave_job_group *ptr_g;
2764 
2765     if (unlikely(thd->killed || worker->running_status == Slave_worker::STOP_ACCEPTED))
2766     {
2767       assert(worker->running_status != Slave_worker::ERROR_LEAVING);
2768       // de-queueing and decrement counters is in the caller's exit branch
2769       error= -1;
2770       goto err;
2771     }
2772 
2773     ev= job_item->data;
2774     assert(ev != NULL);
2775     DBUG_PRINT("info", ("W_%lu <- job item: %p data: %p thd: %p",
2776                         worker->id, job_item, ev, thd));
2777     if (is_gtid_event(ev))
2778       seen_gtid= true;
2779     if (!seen_begin && ev->starts_group())
2780     {
2781       seen_begin= true; // The current group is started with B-event
2782       worker->end_group_sets_max_dbs= true;
2783     }
2784     set_timespec_nsec(&worker->ts_exec[0], 0); // pre-exec
2785     worker->stats_read_time += diff_timespec(&worker->ts_exec[0],
2786                                              &worker->ts_exec[1]);
2787     /* Adapting to possible new Format_description_log_event */
2788     ptr_g= rli->gaq->get_job_group(ev->mts_group_idx);
2789     if (ptr_g->new_fd_event)
2790     {
2791       worker->set_rli_description_event(ptr_g->new_fd_event);
2792       ptr_g->new_fd_event= NULL;
2793     }
2794 
2795     error= worker->slave_worker_exec_event(ev);
2796 
2797     set_timespec_nsec(&worker->ts_exec[1], 0); // pre-exec
2798     worker->stats_exec_time += diff_timespec(&worker->ts_exec[1],
2799                                              &worker->ts_exec[0]);
2800     if (error || worker->found_order_commit_deadlock())
2801     {
2802       worker->prepare_for_retry(*ev);
2803       error= worker->retry_transaction(start_relay_number, start_relay_pos,
2804                                        job_item->relay_number,
2805                                        job_item->relay_pos);
2806       if (error)
2807         goto err;
2808     }
2809     /*
2810       p-event or any other event of B-free (malformed) group can
2811       "commit" with logical clock scheduler. In that case worker id
2812       points to the only active "exclusive" Worker that processes such
2813       malformed group events one by one.
2814       WL#7592 refines the original assert disjunction formula
2815       with the final disjunct.
2816     */
2817     assert(seen_begin || is_gtid_event(ev) ||
2818            ev->get_type_code() == binary_log::QUERY_EVENT ||
2819            is_mts_db_partitioned(rli) || worker->id == 0 || seen_gtid);
2820 
2821     if (ev->ends_group() ||
2822         (!seen_begin && !is_gtid_event(ev) &&
2823          (ev->get_type_code() == binary_log::QUERY_EVENT ||
2824           /* break through by LC only in GTID off */
2825           (!seen_gtid && !is_mts_db_partitioned(rli)))))
2826       break;
2827 
2828     remove_item_from_jobs(job_item, worker, rli);
2829     /* The event will be used later if worker is NULL, so it is not freed */
2830     if (ev->worker != NULL)
2831       delete ev;
2832 
2833     job_item= pop_jobs_item(worker, job_item);
2834   }
2835 
2836   DBUG_PRINT("info", (" commits GAQ index %lu, last committed  %lu",
2837                       ev->mts_group_idx, worker->last_group_done_index));
2838   /* The group is applied successfully, so error should be 0 */
2839   worker->slave_worker_ends_group(ev, 0);
2840 
2841 #ifndef NDEBUG
2842   DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group"
2843                      " %u processed %lu debug %d\n", worker->id, opt_mts_checkpoint_group,
2844                      worker->groups_done,
2845                      DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
2846 
2847   if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
2848       opt_mts_checkpoint_group == worker->groups_done)
2849   {
2850     DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id));
2851     while (true) my_sleep(6000000);
2852   }
2853 #endif
2854 
2855   remove_item_from_jobs(job_item, worker, rli);
2856   delete ev;
2857 
2858   DBUG_RETURN(0);
2859 err:
2860   if (error)
2861   {
2862     report_error_to_coordinator(worker);
2863     DBUG_PRINT("info", ("Worker %lu is exiting: killed %i, error %i, "
2864                         "running_status %d",
2865                         worker->id, thd->killed, thd->is_error(),
2866                         worker->running_status));
2867     worker->slave_worker_ends_group(ev, error); /* last done sets post exec */
2868   }
2869   DBUG_RETURN(error);
2870 }
2871 
get_for_channel_str(bool upper_case) const2872 const char* Slave_worker::get_for_channel_str(bool upper_case) const
2873 {
2874   return c_rli->get_for_channel_str(upper_case);
2875 }
2876 
get_table_pk_field_indexes()2877 const uint* Slave_worker::get_table_pk_field_indexes()
2878 {
2879   return info_slave_worker_table_pk_field_indexes;
2880 }
2881 
get_channel_field_index()2882 uint Slave_worker::get_channel_field_index()
2883 {
2884   return LINE_FOR_CHANNEL;
2885 }
2886