1 /* Copyright (c) 2011, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "debug_sync.h"
24 #include "rpl_rli_pdb.h"
25
26 #include "log.h" // sql_print_error
27 #include "rpl_slave_commit_order_manager.h" // Commit_order_manager
28 #include "rpl_msr.h" // for channel_map
29
30 #include "pfs_file_provider.h"
31 #include "mysql/psi/mysql_file.h"
32
33 #ifndef NDEBUG
34 ulong w_rr= 0;
35 uint mts_debug_concurrent_access= 0;
36 #endif
37
38 #define HASH_DYNAMIC_INIT 4
39
40 using std::min;
41 using std::max;
42
43 /**
44 This function is called by both coordinator and workers.
45
46 Upon receiving the STOP command, the workers will identify a
47 maximum group index already executed (or under execution).
48
49 All groups whose index are below or equal to the maximum
50 group index will be applied by the workers before stopping.
51
52 The workers with groups above the maximum group index will
53 exit without applying these groups by setting their running
54 status to "STOP_ACCEPTED".
55
56 @param worker a pointer to the waiting Worker struct
57 @param job_item a pointer to struct carrying a reference to an event
58
59 @return true if STOP command gets accepted otherwise false is returned.
60 */
handle_slave_worker_stop(Slave_worker * worker,Slave_job_item * job_item)61 bool handle_slave_worker_stop(Slave_worker *worker,
62 Slave_job_item *job_item)
63 {
64 ulonglong group_index= 0;
65 Relay_log_info *rli= worker->c_rli;
66 mysql_mutex_lock(&rli->exit_count_lock);
67 /*
68 First, W calculates a group-"at-hands" index which is
69 either the currently read ev group index, or the last executed
70 group's one when the queue is empty.
71 */
72 group_index= (job_item->data)?
73 rli->gaq->get_job_group(job_item->data->mts_group_idx)->total_seqno:
74 worker->last_groups_assigned_index;
75
76 /*
77 The max updated index is being updated as long as
78 exit_counter permits. That's stopped with the final W's
79 increment of it.
80 */
81 if (!worker->exit_incremented)
82 {
83 if (rli->exit_counter < rli->slave_parallel_workers)
84 rli->max_updated_index = max(rli->max_updated_index, group_index);
85
86 ++rli->exit_counter;
87 worker->exit_incremented= true;
88 assert(!is_mts_worker(current_thd));
89 }
90 #ifndef NDEBUG
91 else
92 assert(is_mts_worker(current_thd));
93 #endif
94
95 /*
96 Now let's decide about the deferred exit to consider
97 the empty queue and the counter value reached
98 slave_parallel_workers.
99 */
100 if (!job_item->data)
101 {
102 worker->running_status= Slave_worker::STOP_ACCEPTED;
103 mysql_cond_signal(&worker->jobs_cond);
104 mysql_mutex_unlock(&rli->exit_count_lock);
105 return(true);
106 }
107 else if (rli->exit_counter == rli->slave_parallel_workers)
108 {
109 //over steppers should exit with accepting STOP
110 if (group_index > rli->max_updated_index)
111 {
112 worker->running_status= Slave_worker::STOP_ACCEPTED;
113 mysql_cond_signal(&worker->jobs_cond);
114 mysql_mutex_unlock(&rli->exit_count_lock);
115 return(true);
116 }
117 }
118 mysql_mutex_unlock(&rli->exit_count_lock);
119 return(false);
120 }
121
122 /**
123 This function is called by both coordinator and workers.
124 Both coordinator and workers contribute to max_updated_index.
125
126 @param worker a pointer to the waiting Worker struct
127 @param job_item a pointer to struct carrying a reference to an event
128
129 @return true if STOP command gets accepted otherwise false is returned.
130 */
set_max_updated_index_on_stop(Slave_worker * worker,Slave_job_item * job_item)131 bool set_max_updated_index_on_stop(Slave_worker *worker,
132 Slave_job_item *job_item)
133 {
134 head_queue(&worker->jobs, job_item);
135 if (worker->running_status == Slave_worker::STOP)
136 {
137 if (handle_slave_worker_stop(worker, job_item))
138 return true;
139 }
140 return false;
141 }
142
143 /*
144 Please every time you add a new field to the worker slave info, update
145 what follows. For now, this is just used to get the number of fields.
146 */
147 const char *info_slave_worker_fields []=
148 {
149 "id",
150 /*
151 These positions identify what has been executed. Notice that they are
152 redudant and only the group_master_log_name and group_master_log_pos
153 are really necessary. However, the additional information is kept to
154 ease debugging.
155 */
156 "group_relay_log_name",
157 "group_relay_log_pos",
158 "group_master_log_name",
159 "group_master_log_pos",
160
161 /*
162 These positions identify what a worker knew about the coordinator at
163 the time a job was assigned. Notice that they are redudant and are
164 kept to ease debugging.
165 */
166 "checkpoint_relay_log_name",
167 "checkpoint_relay_log_pos",
168 "checkpoint_master_log_name",
169 "checkpoint_master_log_pos",
170
171 /*
172 Identify the greatest job, i.e. group, processed by a worker.
173 */
174 "checkpoint_seqno",
175 /*
176 Maximum number of jobs that can be assigned to a worker. This
177 information is necessary to read the next entry.
178 */
179 "checkpoint_group_size",
180 /*
181 Bitmap used to identify what jobs were processed by a worker.
182 */
183 "checkpoint_group_bitmap",
184 /*
185 Channel on which this workers are acting
186 */
187 "channel_name"
188 };
189
190 /*
191 Number of records in the mts partition hash below which
192 entries with zero usage are tolerated so could be quickly
193 recycled.
194 */
195 const ulong mts_partition_hash_soft_max= 16;
196
197 /*
198 index value of some outstanding slots of info_slave_worker_fields
199 */
200 enum {
201 LINE_FOR_CHANNEL= 12,
202 };
203
204 const uint info_slave_worker_table_pk_field_indexes []=
205 {
206 LINE_FOR_CHANNEL,
207 0,
208 };
209
Slave_worker(Relay_log_info * rli,PSI_mutex_key * param_key_info_run_lock,PSI_mutex_key * param_key_info_data_lock,PSI_mutex_key * param_key_info_sleep_lock,PSI_mutex_key * param_key_info_thd_lock,PSI_mutex_key * param_key_info_data_cond,PSI_mutex_key * param_key_info_start_cond,PSI_mutex_key * param_key_info_stop_cond,PSI_mutex_key * param_key_info_sleep_cond,uint param_id,const char * param_channel)210 Slave_worker::Slave_worker(Relay_log_info *rli
211 #ifdef HAVE_PSI_INTERFACE
212 ,PSI_mutex_key *param_key_info_run_lock,
213 PSI_mutex_key *param_key_info_data_lock,
214 PSI_mutex_key *param_key_info_sleep_lock,
215 PSI_mutex_key *param_key_info_thd_lock,
216 PSI_mutex_key *param_key_info_data_cond,
217 PSI_mutex_key *param_key_info_start_cond,
218 PSI_mutex_key *param_key_info_stop_cond,
219 PSI_mutex_key *param_key_info_sleep_cond
220 #endif
221 , uint param_id, const char *param_channel
222 )
223 : Relay_log_info(FALSE
224 #ifdef HAVE_PSI_INTERFACE
225 ,param_key_info_run_lock, param_key_info_data_lock,
226 param_key_info_sleep_lock, param_key_info_thd_lock,
227 param_key_info_data_cond, param_key_info_start_cond,
228 param_key_info_stop_cond, param_key_info_sleep_cond
229 #endif
230 , param_id + 1, param_channel, true
231 ),
232 c_rli(rli),
233 curr_group_exec_parts(key_memory_db_worker_hash_entry),
234 id(param_id),
235 checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
236 checkpoint_seqno(0), running_status(NOT_RUNNING), exit_incremented(false)
237 {
238 /*
239 In the future, it would be great if we use only one identifier.
240 So when factoring out this code, please, consider this.
241 */
242 assert(internal_id == id + 1);
243 checkpoint_relay_log_name[0]= 0;
244 checkpoint_master_log_name[0]= 0;
245
246 mysql_mutex_init(key_mutex_slave_parallel_worker, &jobs_lock,
247 MY_MUTEX_INIT_FAST);
248 mysql_cond_init(key_cond_slave_parallel_worker, &jobs_cond);
249 mysql_cond_init(key_cond_mts_gaq, &logical_clock_cond);
250 }
251
~Slave_worker()252 Slave_worker::~Slave_worker()
253 {
254 end_info();
255 if (jobs.inited_queue)
256 {
257 assert(jobs.m_Q.size() == jobs.size);
258 jobs.m_Q.clear();
259 }
260 mysql_mutex_destroy(&jobs_lock);
261 mysql_cond_destroy(&jobs_cond);
262 mysql_cond_destroy(&logical_clock_cond);
263 mysql_mutex_lock(&info_thd_lock);
264 info_thd= NULL;
265 mysql_mutex_unlock(&info_thd_lock);
266 set_rli_description_event(NULL);
267 }
268
269 /**
270 Method is executed by Coordinator at Worker startup time to initialize
271 members parly with values supplied by Coordinator through rli.
272
273 @param rli Coordinator's Relay_log_info pointer
274 @param i identifier of the Worker
275
276 @return 0 success
277 non-zero failure
278 */
init_worker(Relay_log_info * rli,ulong i)279 int Slave_worker::init_worker(Relay_log_info * rli, ulong i)
280 {
281 DBUG_ENTER("Slave_worker::init_worker");
282 assert(!rli->info_thd->is_error());
283
284 Slave_job_item empty= Slave_job_item();
285
286 c_rli= rli;
287 set_commit_order_manager(c_rli->get_commit_order_manager());
288
289 if (rli_init_info(false) ||
290 DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
291 DBUG_RETURN(1);
292
293 id= i;
294 curr_group_exec_parts.clear();
295 relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
296 checkpoint_notified= FALSE; // the same as above
297 master_log_change_notified= false;// W learns master log during 1st group exec
298 fd_change_notified= false; // W is to learn master FD version same as above
299 server_version= version_product(rli->slave_version_split);
300 bitmap_shifted= 0;
301 workers= c_rli->workers; // shallow copying is sufficient
302 wq_empty_waits= wq_size_waits_cnt= groups_done= events_done= curr_jobs= 0;
303 usage_partition= 0;
304 end_group_sets_max_dbs= false;
305 gaq_index= last_group_done_index= c_rli->gaq->size; // out of range
306 last_groups_assigned_index=0;
307 assert(!jobs.inited_queue);
308 jobs.avail= 0;
309 jobs.len= 0;
310 jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
311 jobs.waited_overfill= 0;
312 jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max;
313 jobs.inited_queue= true;
314 curr_group_seen_begin= curr_group_seen_gtid= false;
315 #ifndef NDEBUG
316 curr_group_seen_sequence_number= false;
317 #endif
318 jobs.m_Q.resize(jobs.size, empty);
319 assert(jobs.m_Q.size() == jobs.size);
320
321 wq_overrun_cnt= excess_cnt= 0;
322 underrun_level= (ulong) ((rli->mts_worker_underrun_level * jobs.size) / 100.0);
323 // overrun level is symmetric to underrun (as underrun to the full queue)
324 overrun_level= jobs.size - underrun_level;
325
326 /* create mts submode for each of the the workers. */
327 current_mts_submode=
328 (rli->channel_mts_submode == MTS_PARALLEL_TYPE_DB_NAME)?
329 (Mts_submode*) new Mts_submode_database():
330 (Mts_submode*) new Mts_submode_logical_clock();
331
332 //workers and coordinator must be of the same type
333 assert(rli->current_mts_submode->get_type() ==
334 current_mts_submode->get_type());
335
336 m_order_commit_deadlock= false;
337 DBUG_RETURN(0);
338 }
339
340 /**
341 A part of Slave worker iitializer that provides a
342 minimum context for MTS recovery.
343
344 @param is_gaps_collecting_phase
345
346 clarifies what state the caller
347 executes this method from. When it's @c true
348 that is @c mts_recovery_groups() and Worker should
349 restore the last session time info which is processed
350 to collect gaps that is not executed transactions (groups).
351 Such recovery Slave_worker intance is destroyed at the end of
352 @c mts_recovery_groups().
353 Whet it's @c false Slave_worker is initialized for the run time
354 nad should not read the last session time stale info.
355 Its info will be ultimately reset once all gaps are executed
356 to finish off recovery.
357
358 @return 0 on success, non-zero for a failure
359 */
rli_init_info(bool is_gaps_collecting_phase)360 int Slave_worker::rli_init_info(bool is_gaps_collecting_phase)
361 {
362 enum_return_check return_check= ERROR_CHECKING_REPOSITORY;
363
364 DBUG_ENTER("Slave_worker::rli_init_info");
365
366 if (inited)
367 DBUG_RETURN(0);
368
369 /*
370 Worker bitmap size depends on recovery mode.
371 If it is gaps collecting the bitmaps must be capable to accept
372 up to MTS_MAX_BITS_IN_GROUP of bits.
373 */
374 size_t num_bits= is_gaps_collecting_phase ?
375 MTS_MAX_BITS_IN_GROUP : c_rli->checkpoint_group;
376 /*
377 This checks if the repository was created before and thus there
378 will be values to be read. Please, do not move this call after
379 the handler->init_info().
380 */
381 return_check= check_info();
382 if (return_check == ERROR_CHECKING_REPOSITORY ||
383 (return_check == REPOSITORY_DOES_NOT_EXIST && is_gaps_collecting_phase))
384 goto err;
385
386 if (handler->init_info())
387 goto err;
388
389 bitmap_init(&group_executed, NULL, num_bits, FALSE);
390 bitmap_init(&group_shifted, NULL, num_bits, FALSE);
391
392 if (is_gaps_collecting_phase &&
393 (DBUG_EVALUATE_IF("mts_slave_worker_init_at_gaps_fails", true, false) ||
394 read_info(handler)))
395 {
396 bitmap_free(&group_executed);
397 bitmap_free(&group_shifted);
398 goto err;
399 }
400 inited= 1;
401
402 DBUG_RETURN(0);
403
404 err:
405 // todo: handler->end_info(uidx, nidx);
406 inited= 0;
407 sql_print_error("Error reading slave worker configuration");
408 DBUG_RETURN(1);
409 }
410
end_info()411 void Slave_worker::end_info()
412 {
413 DBUG_ENTER("Slave_worker::end_info");
414
415 if (!inited)
416 DBUG_VOID_RETURN;
417
418 if (handler)
419 handler->end_info();
420
421 if (inited)
422 {
423 bitmap_free(&group_executed);
424 bitmap_free(&group_shifted);
425 }
426 inited = 0;
427
428 DBUG_VOID_RETURN;
429 }
430
flush_info(const bool force)431 int Slave_worker::flush_info(const bool force)
432 {
433 DBUG_ENTER("Slave_worker::flush_info");
434
435 if (!inited)
436 DBUG_RETURN(0);
437
438 /*
439 We update the sync_period at this point because only here we
440 now that we are handling a Slave_worker. This needs to be
441 update every time we call flush because the option may be
442 dinamically set.
443 */
444 handler->set_sync_period(sync_relayloginfo_period);
445
446 if (write_info(handler))
447 goto err;
448
449 if (handler->flush_info(force))
450 goto err;
451
452 DBUG_RETURN(0);
453
454 err:
455 sql_print_error("Error writing slave worker configuration");
456 DBUG_RETURN(1);
457 }
458
read_info(Rpl_info_handler * from)459 bool Slave_worker::read_info(Rpl_info_handler *from)
460 {
461 DBUG_ENTER("Slave_worker::read_info");
462
463 ulong temp_group_relay_log_pos= 0;
464 char temp_group_master_log_name[FN_REFLEN];
465 ulong temp_group_master_log_pos= 0;
466 ulong temp_checkpoint_relay_log_pos= 0;
467 ulong temp_checkpoint_master_log_pos= 0;
468 ulong temp_checkpoint_seqno= 0;
469 ulong nbytes= 0;
470 uchar *buffer= (uchar *) group_executed.bitmap;
471 int temp_internal_id= 0;
472
473 if (from->prepare_info_for_read())
474 DBUG_RETURN(TRUE);
475
476 if (from->get_info(&temp_internal_id, 0) ||
477 from->get_info(group_relay_log_name,
478 sizeof(group_relay_log_name),
479 (char *) "") ||
480 from->get_info(&temp_group_relay_log_pos,
481 0UL) ||
482 from->get_info(temp_group_master_log_name,
483 sizeof(temp_group_master_log_name),
484 (char *) "") ||
485 from->get_info(&temp_group_master_log_pos,
486 0UL) ||
487 from->get_info(checkpoint_relay_log_name,
488 sizeof(checkpoint_relay_log_name),
489 (char *) "") ||
490 from->get_info(&temp_checkpoint_relay_log_pos,
491 0UL) ||
492 from->get_info(checkpoint_master_log_name,
493 sizeof(checkpoint_master_log_name),
494 (char *) "") ||
495 from->get_info(&temp_checkpoint_master_log_pos,
496 0UL) ||
497 from->get_info(&temp_checkpoint_seqno,
498 0UL) ||
499 from->get_info(&nbytes, 0UL) ||
500 from->get_info(buffer, (size_t) nbytes,
501 (uchar *) 0) ||
502 /* default is empty string */
503 from->get_info(channel, sizeof(channel),(char*)""))
504 DBUG_RETURN(TRUE);
505
506 assert(nbytes <= no_bytes_in_map(&group_executed));
507
508 internal_id=(uint) temp_internal_id;
509 group_relay_log_pos= temp_group_relay_log_pos;
510 set_group_master_log_name(temp_group_master_log_name);
511 set_group_master_log_pos(temp_group_master_log_pos);
512 checkpoint_relay_log_pos= temp_checkpoint_relay_log_pos;
513 checkpoint_master_log_pos= temp_checkpoint_master_log_pos;
514 checkpoint_seqno= temp_checkpoint_seqno;
515
516 DBUG_RETURN(FALSE);
517 }
518
519 /*
520 This function is used to make a copy of the worker object before we
521 destroy it while STOP SLAVE. This new object is then used to report the
522 worker status until next START SLAVE following which the new worker objetcs
523 will be used.
524 */
copy_values_for_PFS(ulong worker_id,en_running_state thd_running_status,THD * worker_thd,const Error & last_error,const Gtid_specification & gtid)525 void Slave_worker::copy_values_for_PFS(ulong worker_id,
526 en_running_state thd_running_status,
527 THD *worker_thd,
528 const Error &last_error,
529 const Gtid_specification >id)
530 {
531 id= worker_id;
532 running_status= thd_running_status;
533 info_thd= worker_thd;
534 m_last_error= last_error;
535 currently_executing_gtid= gtid;
536 }
537
set_info_search_keys(Rpl_info_handler * to)538 bool Slave_worker::set_info_search_keys(Rpl_info_handler *to)
539 {
540 DBUG_ENTER("Slave_worker::set_info_search_keys");
541
542 /* primary keys are Id and channel_name */
543 if(to->set_info(0, (int)internal_id ) || to->set_info(LINE_FOR_CHANNEL, channel))
544 DBUG_RETURN(TRUE);
545
546 DBUG_RETURN(FALSE);
547 }
548
write_info(Rpl_info_handler * to)549 bool Slave_worker::write_info(Rpl_info_handler *to)
550 {
551 DBUG_ENTER("Master_info::write_info");
552
553 ulong nbytes= (ulong) no_bytes_in_map(&group_executed);
554 uchar *buffer= (uchar*) group_executed.bitmap;
555 assert(nbytes <= (c_rli->checkpoint_group + 7) / 8);
556
557 if (to->prepare_info_for_write() ||
558 to->set_info((int) internal_id) ||
559 to->set_info(group_relay_log_name) ||
560 to->set_info((ulong) group_relay_log_pos) ||
561 to->set_info(get_group_master_log_name()) ||
562 to->set_info((ulong) get_group_master_log_pos()) ||
563 to->set_info(checkpoint_relay_log_name) ||
564 to->set_info((ulong) checkpoint_relay_log_pos) ||
565 to->set_info(checkpoint_master_log_name) ||
566 to->set_info((ulong) checkpoint_master_log_pos) ||
567 to->set_info((ulong) checkpoint_seqno) ||
568 to->set_info(nbytes) ||
569 to->set_info(buffer, (size_t) nbytes)||
570 to->set_info(channel))
571 DBUG_RETURN(TRUE);
572
573 DBUG_RETURN(FALSE);
574 }
575
576 /**
577 Clean up a part of Worker info table that is regarded in
578 in gaps collecting at recovery.
579 This worker won't contribute to recovery bitmap at future
580 slave restart (see @c mts_recovery_groups).
581
582 @retrun FALSE as success TRUE as failure
583 */
reset_recovery_info()584 bool Slave_worker::reset_recovery_info()
585 {
586 bool binlog_prot_acquired= false;
587
588 DBUG_ENTER("Slave_worker::reset_recovery_info");
589
590 if (info_thd && !info_thd->backup_binlog_lock.is_acquired())
591 {
592 const ulong timeout= info_thd->variables.lock_wait_timeout;
593
594 DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
595
596 if (info_thd->backup_binlog_lock.acquire_protection(info_thd, MDL_EXPLICIT,
597 timeout))
598 DBUG_RETURN(true);
599
600 binlog_prot_acquired= true;
601 }
602
603 set_group_master_log_name("");
604 set_group_master_log_pos(0);
605
606 if (binlog_prot_acquired)
607 {
608 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
609 info_thd->backup_binlog_lock.release_protection(info_thd);
610 }
611
612
613 DBUG_RETURN(flush_info(true));
614 }
615
get_number_worker_fields()616 size_t Slave_worker::get_number_worker_fields()
617 {
618 return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
619 }
620
get_master_log_name()621 const char* Slave_worker::get_master_log_name()
622 {
623 Slave_job_group* ptr_g= c_rli->gaq->get_job_group(gaq_index);
624
625 return (ptr_g->checkpoint_log_name != NULL) ?
626 ptr_g->checkpoint_log_name : checkpoint_master_log_name;
627 }
628
commit_positions(Log_event * ev,Slave_job_group * ptr_g,bool force)629 bool Slave_worker::commit_positions(Log_event *ev, Slave_job_group* ptr_g, bool force)
630 {
631 bool binlog_prot_acquired= false;
632 DBUG_ENTER("Slave_worker::checkpoint_positions");
633
634 if (info_thd && !info_thd->backup_binlog_lock.is_acquired())
635 {
636 const ulong timeout= info_thd->variables.lock_wait_timeout;
637
638 DBUG_PRINT("debug", ("Acquiring binlog protection lock"));
639
640 if (info_thd->backup_binlog_lock.acquire_protection(info_thd, MDL_EXPLICIT,
641 timeout))
642 DBUG_RETURN(true);
643
644 binlog_prot_acquired= true;
645 }
646
647 /*
648 Initial value of checkpoint_master_log_name is learned from
649 group_master_log_name. The latter can be passed to Worker
650 at rare event of master binlog rotation.
651 This initialization is needed to provide to Worker info
652 on physical coordiates during execution of the very first group
653 after a rotation.
654 */
655 if (ptr_g->group_master_log_name != NULL)
656 {
657 set_group_master_log_name(ptr_g->group_master_log_name);
658 my_free(ptr_g->group_master_log_name);
659 ptr_g->group_master_log_name= NULL;
660 strmake(checkpoint_master_log_name, get_group_master_log_name(),
661 sizeof(checkpoint_master_log_name) - 1);
662 }
663 if (ptr_g->checkpoint_log_name != NULL)
664 {
665 strmake(checkpoint_relay_log_name, ptr_g->checkpoint_relay_log_name,
666 sizeof(checkpoint_relay_log_name) - 1);
667 checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos;
668 strmake(checkpoint_master_log_name, ptr_g->checkpoint_log_name,
669 sizeof(checkpoint_master_log_name) - 1);
670 checkpoint_master_log_pos= ptr_g->checkpoint_log_pos;
671
672 my_free(ptr_g->checkpoint_log_name);
673 ptr_g->checkpoint_log_name= NULL;
674 my_free(ptr_g->checkpoint_relay_log_name);
675 ptr_g->checkpoint_relay_log_name= NULL;
676
677 bitmap_copy(&group_shifted, &group_executed);
678 bitmap_clear_all(&group_executed);
679 for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++)
680 {
681 if (bitmap_is_set(&group_shifted, pos))
682 bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
683 }
684 }
685 /*
686 Extracts an updated relay-log name to store in Worker's rli.
687 */
688 if (ptr_g->group_relay_log_name)
689 {
690 assert(strlen(ptr_g->group_relay_log_name) + 1
691 <= sizeof(group_relay_log_name));
692 strmake(group_relay_log_name, ptr_g->group_relay_log_name,
693 sizeof(group_relay_log_name) - 1);
694 }
695
696 assert(ptr_g->checkpoint_seqno <= (c_rli->checkpoint_group - 1));
697
698 bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
699 checkpoint_seqno= ptr_g->checkpoint_seqno;
700 group_relay_log_pos= ev->future_event_relay_log_pos;
701 set_group_master_log_pos(ev->common_header->log_pos);
702
703 /*
704 Directly accessing c_rli->get_group_master_log_name() does not
705 represent a concurrency issue because the current code places
706 a synchronization point when master rotates.
707 */
708 set_group_master_log_name(c_rli->get_group_master_log_name());
709
710 DBUG_PRINT("mts", ("Committing worker-id %lu group master log pos %llu "
711 "group master log name %s checkpoint sequence number %lu.",
712 id, get_group_master_log_pos(),
713 get_group_master_log_name(), checkpoint_seqno));
714
715 DBUG_EXECUTE_IF("mts_debug_concurrent_access",
716 {
717 mts_debug_concurrent_access++;
718 };
719 );
720
721 if (binlog_prot_acquired)
722 {
723 DBUG_PRINT("debug", ("Releasing binlog protection lock"));
724 info_thd->backup_binlog_lock.release_protection(info_thd);
725 }
726
727 DBUG_RETURN(flush_info(force));
728 }
729
rollback_positions(Slave_job_group * ptr_g)730 void Slave_worker::rollback_positions(Slave_job_group* ptr_g)
731 {
732 if (!is_transactional())
733 {
734 bitmap_clear_bit(&group_executed, ptr_g->checkpoint_seqno);
735 flush_info(false);
736 }
737 }
738
get_key(const uchar * record,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))739 extern "C" uchar *get_key(const uchar *record, size_t *length,
740 my_bool not_used MY_ATTRIBUTE((unused)))
741 {
742 DBUG_ENTER("get_key");
743
744 db_worker_hash_entry *entry=(db_worker_hash_entry *) record;
745 *length= strlen(entry->db);
746
747 DBUG_PRINT("info", ("get_key %s, %d", entry->db, (int) *length));
748
749 DBUG_RETURN((uchar*) entry->db);
750 }
751
752
free_entry(db_worker_hash_entry * entry)753 static void free_entry(db_worker_hash_entry *entry)
754 {
755 THD *c_thd= current_thd;
756
757 DBUG_ENTER("free_entry");
758
759 DBUG_PRINT("info", ("free_entry %s, %zu", entry->db, strlen(entry->db)));
760
761 assert(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
762
763 /*
764 Although assert is correct valgrind senses entry->worker can be freed.
765
766 assert(entry->usage == 0 ||
767 !entry->worker || // last entry owner could have errored out
768 entry->worker->running_status != Slave_worker::RUNNING);
769 */
770
771 mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
772 entry->temporary_tables= NULL;
773
774 my_free((void *) entry->db);
775 my_free(entry);
776
777 DBUG_VOID_RETURN;
778 }
779
init_hash_workers(Relay_log_info * rli)780 bool init_hash_workers(Relay_log_info *rli)
781 {
782 DBUG_ENTER("init_hash_workers");
783
784 rli->inited_hash_workers=
785 (my_hash_init(&rli->mapping_db_to_worker, &my_charset_bin,
786 0, 0, 0, get_key,
787 (my_hash_free_key) free_entry, 0,
788 key_memory_db_worker_hash_entry) == 0);
789 if (rli->inited_hash_workers)
790 {
791 mysql_mutex_init(key_mutex_slave_worker_hash, &rli->slave_worker_hash_lock,
792 MY_MUTEX_INIT_FAST);
793 mysql_cond_init(key_cond_slave_worker_hash, &rli->slave_worker_hash_cond);
794 }
795
796 DBUG_RETURN (!rli->inited_hash_workers);
797 }
798
destroy_hash_workers(Relay_log_info * rli)799 void destroy_hash_workers(Relay_log_info *rli)
800 {
801 DBUG_ENTER("destroy_hash_workers");
802 if (rli->inited_hash_workers)
803 {
804 my_hash_free(&rli->mapping_db_to_worker);
805 mysql_mutex_destroy(&rli->slave_worker_hash_lock);
806 mysql_cond_destroy(&rli->slave_worker_hash_cond);
807 rli->inited_hash_workers= false;
808 }
809
810 DBUG_VOID_RETURN;
811 }
812
813 /**
814 Relocating temporary table reference into @c entry's table list head.
815 Sources can be the coordinator's and the Worker's thd->temporary_tables.
816
817 @param table TABLE instance pointer
818 @param thd THD instance pointer of the source of relocation
819 @param entry db_worker_hash_entry instance pointer
820
821 @note thd->temporary_tables can become NULL
822
823 @return the pointer to a table following the unlinked
824 */
mts_move_temp_table_to_entry(TABLE * table,THD * thd,db_worker_hash_entry * entry)825 TABLE* mts_move_temp_table_to_entry(TABLE *table, THD *thd,
826 db_worker_hash_entry *entry)
827 {
828 TABLE *ret= table->next;
829
830 if (table->prev)
831 {
832 table->prev->next= table->next;
833 if (table->prev->next)
834 table->next->prev= table->prev;
835 }
836 else
837 {
838 /* removing the first item from the list */
839 assert(table == thd->temporary_tables);
840
841 thd->temporary_tables= table->next;
842 if (thd->temporary_tables)
843 table->next->prev= 0;
844 }
845 table->next= entry->temporary_tables;
846 table->prev= 0;
847 if (table->next)
848 table->next->prev= table;
849 entry->temporary_tables= table;
850
851 return ret;
852 }
853
854
855 /**
856 Relocation of the list of temporary tables to thd->temporary_tables.
857
858 @param thd THD instance pointer of the destination
859 @param temporary_tables
860 the source temporary_tables list
861
862 @note destroying references to the source list, if necessary,
863 is left to the caller.
864
865 @return the post-merge value of thd->temporary_tables.
866 */
mts_move_temp_tables_to_thd(THD * thd,TABLE * temporary_tables)867 TABLE* mts_move_temp_tables_to_thd(THD *thd, TABLE *temporary_tables)
868 {
869 DBUG_ENTER ("mts_move_temp_tables_to_thd");
870 TABLE *table= temporary_tables;
871 if (!table)
872 DBUG_RETURN(NULL);
873
874 // accept only if this is the start of the list.
875 assert(!table->prev);
876
877 // walk along the source list and associate the tables with thd
878 do
879 {
880 table->in_use= thd;
881 } while(table->next && (table= table->next));
882
883 // link the former list against the tail of the source list
884 if (thd->temporary_tables)
885 thd->temporary_tables->prev= table;
886 table->next= thd->temporary_tables;
887 thd->temporary_tables= temporary_tables;
888 DBUG_RETURN(thd->temporary_tables);
889 }
890
891 /**
892 Relocating references of temporary tables of a database
893 of the entry argument from THD into the entry.
894
895 @param thd THD pointer of the source temporary_tables list
896 @param entry a pointer to db_worker_hash_entry record
897 containing database descriptor and temporary_tables list.
898
899 */
move_temp_tables_to_entry(THD * thd,db_worker_hash_entry * entry)900 static void move_temp_tables_to_entry(THD* thd, db_worker_hash_entry* entry)
901 {
902 for (TABLE *table= thd->temporary_tables; table;)
903 {
904 if (strcmp(table->s->db.str, entry->db) == 0)
905 {
906 // table pointer is shifted inside the function
907 table= mts_move_temp_table_to_entry(table, thd, entry);
908 }
909 else
910 {
911 table= table->next;
912 }
913 }
914 }
915
916
917 /**
918 The function produces a reference to the struct of a Worker
919 that has been or will be engaged to process the @c dbname -keyed partition (D).
920 It checks a local to Coordinator CGAP list first and returns
921 @c last_assigned_worker when found (todo: assert).
922
923 Otherwise, the partition is appended to the current group list:
924
925 CGAP .= D
926
927 here .= is concatenate operation,
928 and a possible D's Worker id is searched in Assigned Partition Hash
929 (APH) that collects tuples (P, W_id, U, mutex, cond).
930 In case not found,
931
932 W_d := W_c unless W_c is NULL.
933
934 When W_c is NULL it is assigned to a least occupied as defined by
935 @c get_least_occupied_worker().
936
937 W_d := W_c := W_{least_occupied}
938
939 APH .= a new (D, W_d, 1)
940
941 In a case APH contains W_d == W_c, (assert U >= 1)
942
943 update APH set U++ where APH.P = D
944
945 The case APH contains a W_d != W_c != NULL assigned to D-partition represents
946 the hashing conflict and is handled as the following:
947
948 a. marks the record of APH with a flag requesting to signal in the
949 cond var when `U' the usage counter drops to zero by the other Worker;
950 b. waits for the other Worker to finish tasks on that partition and
951 gets the signal;
952 c. updates the APH record to point to the first Worker (naturally, U := 1),
953 scheduled the event, and goes back into the parallel mode
954
955 @param dbname pointer to c-string containing database name
956 It can be empty string to indicate specific locking
957 to faciliate sequential applying.
958 @param rli pointer to Coordinators relay-log-info instance
959 @param ptr_entry reference to a pointer to the resulted entry in
960 the Assigne Partition Hash where
961 the entry's pointer is stored at return.
962 @param need_temp_tables
963 if FALSE migration of temporary tables not needed
964 @param last_worker caller opts for this Worker, it must be
965 rli->last_assigned_worker if one is determined.
966
967 @note modifies CGAP, APH and unlinks @c dbname -keyd temporary tables
968 from C's thd->temporary_tables to move them into the entry record.
969
970 @return the pointer to a Worker struct
971 */
map_db_to_worker(const char * dbname,Relay_log_info * rli,db_worker_hash_entry ** ptr_entry,bool need_temp_tables,Slave_worker * last_worker)972 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
973 db_worker_hash_entry **ptr_entry,
974 bool need_temp_tables, Slave_worker *last_worker)
975 {
976 Slave_worker_array *workers= &rli->workers;
977
978 THD *thd= rli->info_thd;
979
980 DBUG_ENTER("map_db_to_worker");
981
982 assert(!rli->last_assigned_worker ||
983 rli->last_assigned_worker == last_worker);
984 assert(is_mts_db_partitioned(rli));
985
986 if (!rli->inited_hash_workers)
987 DBUG_RETURN(NULL);
988
989 db_worker_hash_entry *entry= NULL;
990 my_hash_value_type hash_value;
991 size_t dblength= strlen(dbname);
992
993
994 // Search in CGAP
995 for (db_worker_hash_entry **it= rli->curr_group_assigned_parts.begin();
996 it != rli->curr_group_assigned_parts.end(); ++it)
997 {
998 entry= *it;
999 if ((uchar) entry->db_len != dblength)
1000 continue;
1001 else
1002 if (strncmp(entry->db, const_cast<char*>(dbname), dblength) == 0)
1003 {
1004 *ptr_entry= entry;
1005 DBUG_RETURN(last_worker);
1006 }
1007 }
1008
1009 DBUG_PRINT("info", ("Searching for %s, %zu", dbname, dblength));
1010
1011 hash_value= my_calc_hash(&rli->mapping_db_to_worker, (uchar*) dbname,
1012 dblength);
1013
1014 mysql_mutex_lock(&rli->slave_worker_hash_lock);
1015
1016 entry= (db_worker_hash_entry *)
1017 my_hash_search_using_hash_value(&rli->mapping_db_to_worker, hash_value,
1018 (uchar*) dbname, dblength);
1019 if (!entry)
1020 {
1021 /*
1022 The database name was not found which means that a worker never
1023 processed events from that database. In such case, we need to
1024 map the database to a worker my inserting an entry into the
1025 hash map.
1026 */
1027 my_bool ret;
1028 char *db= NULL;
1029
1030 mysql_mutex_unlock(&rli->slave_worker_hash_lock);
1031
1032 DBUG_PRINT("info", ("Inserting %s, %zu", dbname, dblength));
1033 /*
1034 Allocate an entry to be inserted and if the operation fails
1035 an error is returned.
1036 */
1037 if (!(db= (char *) my_malloc(key_memory_db_worker_hash_entry,
1038 dblength + 1, MYF(0))))
1039 goto err;
1040 if (!(entry= (db_worker_hash_entry *)
1041 my_malloc(key_memory_db_worker_hash_entry,
1042 sizeof(db_worker_hash_entry), MYF(0))))
1043 {
1044 my_free(db);
1045 goto err;
1046 }
1047 my_stpcpy(db, dbname);
1048 entry->db= db;
1049 entry->db_len= strlen(db);
1050 entry->usage= 1;
1051 entry->temporary_tables= NULL;
1052 /*
1053 Unless \exists the last assigned Worker, get a free worker based
1054 on a policy described in the function get_least_occupied_worker().
1055 */
1056 mysql_mutex_lock(&rli->slave_worker_hash_lock);
1057
1058 entry->worker= (!last_worker) ?
1059 get_least_occupied_worker(rli, workers, NULL) : last_worker;
1060 entry->worker->usage_partition++;
1061 if (rli->mapping_db_to_worker.records > mts_partition_hash_soft_max)
1062 {
1063 /*
1064 A dynamic array to store the mapping_db_to_worker hash elements
1065 that needs to be deleted, since deleting the hash entires while
1066 iterating over it is wrong.
1067 */
1068 Prealloced_array<db_worker_hash_entry*, HASH_DYNAMIC_INIT>
1069 hash_element(key_memory_db_worker_hash_entry);
1070 /*
1071 remove zero-usage (todo: rare or long ago scheduled) records.
1072 Store the element of the hash in a dynamic array after checking whether
1073 the usage of the hash entry is 0 or not. We later free it from the HASH.
1074 */
1075 for (uint i= 0; i < rli->mapping_db_to_worker.records; i++)
1076 {
1077 assert(!entry->temporary_tables || !entry->temporary_tables->prev);
1078 assert(!thd->temporary_tables || !thd->temporary_tables->prev);
1079
1080 db_worker_hash_entry *entry=
1081 (db_worker_hash_entry*) my_hash_element(&rli->mapping_db_to_worker, i);
1082
1083 if (entry->usage == 0)
1084 {
1085 mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
1086 entry->temporary_tables= NULL;
1087
1088 /* Push the element in the dynamic array*/
1089 hash_element.push_back(entry);
1090 }
1091 }
1092
1093 /* Delete the hash element based on the usage */
1094 for (size_t i=0 ; i < hash_element.size(); i++)
1095 {
1096 db_worker_hash_entry *temp_entry= hash_element[i];
1097 my_hash_delete(&rli->mapping_db_to_worker, (uchar*) temp_entry);
1098 }
1099 }
1100
1101 ret= my_hash_insert(&rli->mapping_db_to_worker, (uchar*) entry);
1102
1103 if (ret)
1104 {
1105 my_free(db);
1106 my_free(entry);
1107 entry= NULL;
1108 goto err;
1109 }
1110 DBUG_PRINT("info", ("Inserted %s, %zu", entry->db, strlen(entry->db)));
1111 }
1112 else
1113 {
1114 /* There is a record. Either */
1115 if (entry->usage == 0)
1116 {
1117 entry->worker= (!last_worker) ?
1118 get_least_occupied_worker(rli, workers, NULL) : last_worker;
1119 entry->worker->usage_partition++;
1120 entry->usage++;
1121 }
1122 else if (entry->worker == last_worker || !last_worker)
1123 {
1124
1125 assert(entry->worker);
1126
1127 entry->usage++;
1128 }
1129 else
1130 {
1131 // The case APH contains a W_d != W_c != NULL assigned to
1132 // D-partition represents
1133 // the hashing conflict and is handled as the following:
1134 PSI_stage_info old_stage;
1135
1136 assert(last_worker != NULL &&
1137 rli->curr_group_assigned_parts.size() > 0);
1138
1139 // future assignenment and marking at the same time
1140 entry->worker= last_worker;
1141 // loop while a user thread is stopping Coordinator gracefully
1142 do
1143 {
1144 thd->ENTER_COND(&rli->slave_worker_hash_cond,
1145 &rli->slave_worker_hash_lock,
1146 &stage_slave_waiting_worker_to_release_partition,
1147 &old_stage);
1148 mysql_cond_wait(&rli->slave_worker_hash_cond, &rli->slave_worker_hash_lock);
1149 } while (entry->usage != 0 && !thd->killed);
1150
1151 mysql_mutex_unlock(&rli->slave_worker_hash_lock);
1152 thd->EXIT_COND(&old_stage);
1153 if (thd->killed)
1154 {
1155 entry= NULL;
1156 goto err;
1157 }
1158 mysql_mutex_lock(&rli->slave_worker_hash_lock);
1159 entry->usage= 1;
1160 entry->worker->usage_partition++;
1161 }
1162 }
1163
1164 /*
1165 relocation belonging to db temporary tables from C to W via entry
1166 */
1167 if (entry->usage == 1 && need_temp_tables)
1168 {
1169 if (!entry->temporary_tables)
1170 {
1171 if (entry->db_len != 0)
1172 {
1173 move_temp_tables_to_entry(thd, entry);
1174 }
1175 else
1176 {
1177 entry->temporary_tables= thd->temporary_tables;
1178 thd->temporary_tables= NULL;
1179 }
1180 }
1181 #ifndef NDEBUG
1182 else
1183 {
1184 // all entries must have been emptied from temps by the caller
1185
1186 for (TABLE *table= thd->temporary_tables; table; table= table->next)
1187 {
1188 assert(0 != strcmp(table->s->db.str, entry->db));
1189 }
1190 }
1191 #endif
1192 }
1193 mysql_mutex_unlock(&rli->slave_worker_hash_lock);
1194
1195 assert(entry);
1196
1197 err:
1198 if (entry)
1199 {
1200 DBUG_PRINT("info",
1201 ("Updating %s with worker %lu", entry->db, entry->worker->id));
1202 rli->curr_group_assigned_parts.push_back(entry);
1203 *ptr_entry= entry;
1204 }
1205 DBUG_RETURN(entry ? entry->worker : NULL);
1206 }
1207
1208 /**
1209 Get the least occupied worker.
1210
1211 @param ws dynarray of pointers to Slave_worker
1212 @return a pointer to chosen Slave_worker instance
1213
1214 */
get_least_occupied_worker(Relay_log_info * rli,Slave_worker_array * ws,Log_event * ev)1215 Slave_worker *get_least_occupied_worker(Relay_log_info *rli,
1216 Slave_worker_array *ws,
1217 Log_event* ev)
1218 {
1219 return rli->current_mts_submode->get_least_occupied_worker(rli, ws, ev);
1220 }
1221
1222 /**
1223 Deallocation routine to cancel out few effects of
1224 @c map_db_to_worker().
1225 Involved into processing of the group APH tuples are updated.
1226 @c last_group_done_index member is set to the GAQ index of
1227 the current group.
1228 CGEP the Worker partition cache is cleaned up.
1229
1230 @param ev a pointer to Log_event
1231 @param error error code after processing the event by caller.
1232 */
slave_worker_ends_group(Log_event * ev,int error)1233 void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
1234 {
1235 DBUG_ENTER("Slave_worker::slave_worker_ends_group");
1236 Slave_job_group *ptr_g= NULL;
1237
1238 if (!error)
1239 {
1240 ptr_g= c_rli->gaq->get_job_group(gaq_index);
1241
1242 assert(gaq_index == ev->mts_group_idx);
1243 /*
1244 It guarantees that the worker is removed from order commit queue when
1245 its transaction doesn't binlog anything. It will break innodb group commit,
1246 but it should rarely happen.
1247 */
1248 if (get_commit_order_manager())
1249 get_commit_order_manager()->report_commit(this);
1250
1251 // first ever group must have relay log name
1252 assert(last_group_done_index != c_rli->gaq->size ||
1253 ptr_g->group_relay_log_name != NULL);
1254 assert(ptr_g->worker_id == id);
1255
1256 if (ev->get_type_code() != binary_log::XID_EVENT)
1257 {
1258 commit_positions(ev, ptr_g, false);
1259 DBUG_EXECUTE_IF("crash_after_commit_and_update_pos",
1260 sql_print_information("Crashing crash_after_commit_and_update_pos.");
1261 flush_info(TRUE);
1262 DBUG_SUICIDE();
1263 );
1264 }
1265
1266 ptr_g->group_master_log_pos= get_group_master_log_pos();
1267 ptr_g->group_relay_log_pos= group_relay_log_pos;
1268 my_atomic_store32(&ptr_g->done, 1);
1269 last_group_done_index= gaq_index;
1270 last_groups_assigned_index= ptr_g->total_seqno;
1271 reset_gaq_index();
1272 groups_done++;
1273
1274 }
1275 else
1276 {
1277 if (running_status != STOP_ACCEPTED)
1278 {
1279 // tagging as exiting so Coordinator won't be able synchronize with it
1280 mysql_mutex_lock(&jobs_lock);
1281 running_status= ERROR_LEAVING;
1282 mysql_mutex_unlock(&jobs_lock);
1283
1284 /* Fatal error happens, it notifies the following transaction to rollback */
1285 if (get_commit_order_manager())
1286 get_commit_order_manager()->report_rollback(this);
1287
1288 // Killing Coordinator to indicate eventual consistency error
1289 mysql_mutex_lock(&c_rli->info_thd->LOCK_thd_data);
1290 c_rli->info_thd->awake(THD::KILL_QUERY);
1291 mysql_mutex_unlock(&c_rli->info_thd->LOCK_thd_data);
1292 }
1293 }
1294
1295 /*
1296 Cleanup relating to the last executed group regardless of error.
1297 */
1298 if (current_mts_submode->get_type() == MTS_PARALLEL_TYPE_DB_NAME)
1299 {
1300 for (size_t i= 0; i < curr_group_exec_parts.size(); i++)
1301 {
1302 db_worker_hash_entry *entry= curr_group_exec_parts[i];
1303
1304 mysql_mutex_lock(&c_rli->slave_worker_hash_lock);
1305
1306 assert(entry);
1307
1308 entry->usage --;
1309
1310 assert(entry->usage >= 0);
1311
1312 if (entry->usage == 0)
1313 {
1314 usage_partition--;
1315 /*
1316 The detached entry's temp table list, possibly updated, remains
1317 with the entry at least until time Coordinator will deallocate it
1318 from the hash, that is either due to stop or extra size of the hash.
1319 */
1320 assert(usage_partition >= 0);
1321 assert(this->info_thd->temporary_tables == 0);
1322 assert(!entry->temporary_tables ||
1323 !entry->temporary_tables->prev);
1324
1325 if (entry->worker != this) // Coordinator is waiting
1326 {
1327 #ifndef NDEBUG
1328 // TODO: open it! assert(usage_partition || !entry->worker->jobs.len);
1329 #endif
1330 DBUG_PRINT("info",
1331 ("Notifying entry %p release by worker %lu", entry, this->id));
1332
1333 mysql_cond_signal(&c_rli->slave_worker_hash_cond);
1334 }
1335 }
1336 else
1337 assert(usage_partition != 0);
1338
1339 mysql_mutex_unlock(&c_rli->slave_worker_hash_lock);
1340 }
1341
1342 curr_group_exec_parts.clear();
1343 curr_group_exec_parts.shrink_to_fit();
1344
1345 if (error)
1346 {
1347 // Awakening Coordinator that could be waiting for entry release
1348 mysql_mutex_lock(&c_rli->slave_worker_hash_lock);
1349 mysql_cond_signal(&c_rli->slave_worker_hash_cond);
1350 mysql_mutex_unlock(&c_rli->slave_worker_hash_lock);
1351 }
1352 }
1353 else // not DB-type scheduler
1354 {
1355 assert(current_mts_submode->get_type() ==
1356 MTS_PARALLEL_TYPE_LOGICAL_CLOCK);
1357 /*
1358 Check if there're any waiter. If there're try incrementing lwm and
1359 signal to those who've got sasfied with the waiting condition.
1360
1361 In a "good" "likely" execution branch the waiter set is expected
1362 to be empty. LWM is advanced by Coordinator asynchronously.
1363 Also lwm is advanced by a dependent Worker when it inserts its waiting
1364 request into the waiting list.
1365 */
1366 Mts_submode_logical_clock* mts_submode=
1367 static_cast<Mts_submode_logical_clock*>(c_rli->current_mts_submode);
1368 int64 min_child_waited_logical_ts=
1369 my_atomic_load64(&mts_submode->min_waited_timestamp);
1370
1371 DBUG_EXECUTE_IF("slave_worker_ends_group_before_signal_lwm",
1372 {
1373 const char act[]= "now WAIT_FOR worker_continue";
1374 assert(!debug_sync_set_action(current_thd,
1375 STRING_WITH_LEN(act)));
1376 });
1377
1378 if (unlikely(error))
1379 {
1380 mysql_mutex_lock(&c_rli->mts_gaq_LOCK);
1381 mts_submode->is_error= true;
1382 if (mts_submode->min_waited_timestamp != SEQ_UNINIT)
1383 mysql_cond_signal(&c_rli->logical_clock_cond);
1384 mysql_mutex_unlock(&c_rli->mts_gaq_LOCK);
1385 }
1386 else if (min_child_waited_logical_ts != SEQ_UNINIT)
1387 {
1388 mysql_mutex_lock(&c_rli->mts_gaq_LOCK);
1389
1390 /*
1391 min_child_waited_logical_ts may include an old value, so we need to
1392 check it again after getting the lock.
1393 */
1394 if (mts_submode->min_waited_timestamp != SEQ_UNINIT)
1395 {
1396 longlong curr_lwm= mts_submode->get_lwm_timestamp(c_rli, true);
1397
1398 if (mts_submode->clock_leq(mts_submode->min_waited_timestamp, curr_lwm))
1399 {
1400 /*
1401 There's a transaction that depends on the current.
1402 */
1403 mysql_cond_signal(&c_rli->logical_clock_cond);
1404 }
1405 }
1406 mysql_mutex_unlock(&c_rli->mts_gaq_LOCK);
1407 }
1408
1409 #ifndef NDEBUG
1410 curr_group_seen_sequence_number= false;
1411 #endif
1412 }
1413 curr_group_seen_gtid= curr_group_seen_begin= false;
1414
1415 DBUG_VOID_RETURN;
1416 }
1417
1418
1419 /**
1420 two index comparision to determine which of the two
1421 is ordered first.
1422
1423 @note The caller makes sure the args are within the valid
1424 range, incl cases the queue is empty or full.
1425
1426 @return TRUE if the first arg identifies a queue entity ordered
1427 after one defined by the 2nd arg,
1428 FALSE otherwise.
1429 */
1430 template <typename Element_type>
gt(ulong i,ulong k)1431 bool circular_buffer_queue<Element_type>::gt(ulong i, ulong k)
1432 {
1433 assert(i < size && k < size);
1434 assert(avail != entry);
1435
1436 if (i >= entry)
1437 if (k >= entry)
1438 return i > k;
1439 else
1440 return FALSE;
1441 else
1442 if (k >= entry)
1443 return TRUE;
1444 else
1445 return i > k;
1446 }
1447
1448 #ifndef NDEBUG
count_done(Relay_log_info * rli)1449 bool Slave_committed_queue::count_done(Relay_log_info* rli)
1450 {
1451 ulong i, k, cnt= 0;
1452
1453 for (i= entry, k= 0; k < len; i= (i + 1) % size, k++)
1454 {
1455 Slave_job_group *ptr_g;
1456
1457 ptr_g= &m_Q[i];
1458
1459 if (ptr_g->worker_id != MTS_WORKER_UNDEF && ptr_g->done)
1460 cnt++;
1461 }
1462
1463 assert(cnt <= size);
1464
1465 DBUG_PRINT("mts", ("Checking if it can simulate a crash:"
1466 " mts_checkpoint_group %u counter %lu parallel slaves %lu\n",
1467 opt_mts_checkpoint_group, cnt, rli->slave_parallel_workers));
1468
1469 return (cnt == (rli->slave_parallel_workers * opt_mts_checkpoint_group));
1470 }
1471 #endif
1472
1473
1474 /**
1475 The queue is processed from the head item by item
1476 to purge items representing committed groups.
1477 Progress in GAQ is assessed through comparision of GAQ index value
1478 with Worker's @c last_group_done_index.
1479 Purging breaks at a first discovered gap, that is an item
1480 that the assinged item->w_id'th Worker has not yet completed.
1481
1482 The caller is supposed to be the checkpoint handler.
1483
1484 A copy of the last discarded item containing
1485 the refreshed value of the committed low-water-mark is stored
1486 into @c lwm container member for further caller's processing.
1487 @c last_done is updated with the latest total_seqno for each Worker
1488 that was met during GAQ parse.
1489
1490 @note dyn-allocated members of Slave_job_group such as
1491 group_relay_log_name as freed here.
1492
1493 @return number of discarded items
1494 */
move_queue_head(Slave_worker_array * ws)1495 ulong Slave_committed_queue::move_queue_head(Slave_worker_array *ws)
1496 {
1497 DBUG_ENTER("Slave_committed_queue::move_queue_head");
1498 ulong i, cnt= 0;
1499
1500 for (i= entry; i != avail && !empty(); cnt++, i= (i + 1) % size)
1501 {
1502 Slave_worker *w_i;
1503 Slave_job_group *ptr_g;
1504 char grl_name[FN_REFLEN];
1505
1506 #ifndef NDEBUG
1507 if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
1508 cnt == opt_mts_checkpoint_period)
1509 DBUG_RETURN(cnt);
1510 #endif
1511
1512 grl_name[0]= 0;
1513 ptr_g= &m_Q[i];
1514
1515 /*
1516 The current job has not been processed or it was not
1517 even assigned, this means there is a gap.
1518 */
1519 if (ptr_g->worker_id == MTS_WORKER_UNDEF ||
1520 my_atomic_load32(&ptr_g->done) == 0)
1521 break; /* gap at i'th */
1522
1523 /* Worker-id domain guard */
1524 compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS);
1525
1526 w_i= ws->at(ptr_g->worker_id);
1527
1528 /*
1529 Memorizes the latest valid group_relay_log_name.
1530 */
1531 if (ptr_g->group_relay_log_name)
1532 {
1533 strcpy(grl_name, ptr_g->group_relay_log_name);
1534 my_free(ptr_g->group_relay_log_name);
1535 /*
1536 It is important to mark the field as freed.
1537 */
1538 ptr_g->group_relay_log_name= NULL;
1539 }
1540
1541 /*
1542 Removes the job from the (G)lobal (A)ssigned (Q)ueue.
1543 */
1544 Slave_job_group g= Slave_job_group();
1545 #ifndef NDEBUG
1546 ulong ind=
1547 #endif
1548 de_queue(&g);
1549
1550 /*
1551 Stores the memorized name into the result struct. Note that we
1552 take care of the pointer first and then copy the other elements
1553 by assigning the structures.
1554 */
1555 if (grl_name[0] != 0)
1556 {
1557 strcpy(lwm.group_relay_log_name, grl_name);
1558 }
1559 g.group_relay_log_name= lwm.group_relay_log_name;
1560 lwm= g;
1561
1562 assert(ind == i);
1563 assert(!ptr_g->group_relay_log_name);
1564 assert(ptr_g->total_seqno == lwm.total_seqno);
1565 #ifndef NDEBUG
1566 {
1567 ulonglong l= last_done[w_i->id];
1568 /*
1569 There must be some progress otherwise we should have
1570 exit the loop earlier.
1571 */
1572 assert(l < ptr_g->total_seqno);
1573 }
1574 #endif
1575 /*
1576 This is used to calculate the last time each worker has
1577 processed events.
1578 */
1579 last_done[w_i->id]= ptr_g->total_seqno;
1580 }
1581
1582 assert(cnt <= size);
1583
1584 DBUG_RETURN(cnt);
1585 }
1586
1587 /**
1588 Finds low-water mark of committed jobs in GAQ.
1589 That is an index below which all jobs are marked as done.
1590
1591 Notice the first available index is returned when the queue
1592 does not have any incomplete jobs. That includes cases of
1593 the empty and the full of complete jobs queue.
1594 A mutex protecting from concurrent LWM change by
1595 move_queue_head() (by Coordinator) should be taken by the caller.
1596
1597 @param arg_g [out] a double pointer to Slave job descriptor item
1598 last marked with done-as-true boolean.
1599 @param start_index a GAQ index to start/resume searching.
1600 Caller is to make sure the index points into
1601 assigned (occupied) range of circular buffer of GAQ.
1602 @return GAQ index of the last consecutive done job, or the GAQ
1603 size when none is found.
1604 */
find_lwm(Slave_job_group ** arg_g,ulong start_index)1605 ulong Slave_committed_queue::find_lwm(Slave_job_group** arg_g,
1606 ulong start_index)
1607 {
1608 Slave_job_group *ptr_g= NULL;
1609 ulong i, k, cnt;
1610
1611 assert(start_index <= size);
1612
1613 if (empty())
1614 return size;
1615
1616 /*
1617 Loop continuation condition relies on
1618 (TODO: assert it)
1619 the start_index being in the running range:
1620
1621 start_index \in [entry, avail - 1].
1622
1623 It satisfies any queue size including 1.
1624 It does not satisfy the empty queue case which is bailed out earlier above.
1625 */
1626 for (i= start_index, cnt= 0; cnt < len - (start_index + size - entry) % size;
1627 i= (i + 1) % size, cnt++)
1628 {
1629 ptr_g= &m_Q[i];
1630 if (my_atomic_load32(&ptr_g->done) == 0)
1631 {
1632 if (cnt == 0)
1633 return size; // the first node of the queue is not done
1634 break;
1635 }
1636 }
1637 ptr_g= &m_Q[k= (i + size - 1) % size];
1638 *arg_g= ptr_g;
1639
1640 return k;
1641 }
1642
1643 /**
1644 Method should be executed at slave system stop to
1645 cleanup dynamically allocated items that remained as unprocessed
1646 by Coordinator and Workers in their regular execution course.
1647 */
free_dynamic_items()1648 void Slave_committed_queue::free_dynamic_items()
1649 {
1650 ulong i, k;
1651 for (i= entry, k= 0; k < len; i= (i + 1) % size, k++)
1652 {
1653 Slave_job_group *ptr_g= &m_Q[i];
1654 if (ptr_g->group_relay_log_name)
1655 {
1656 my_free(ptr_g->group_relay_log_name);
1657 }
1658 if (ptr_g->checkpoint_log_name)
1659 {
1660 my_free(ptr_g->checkpoint_log_name);
1661 }
1662 if (ptr_g->checkpoint_relay_log_name)
1663 {
1664 my_free(ptr_g->checkpoint_relay_log_name);
1665 }
1666 if (ptr_g->group_master_log_name)
1667 {
1668 my_free(ptr_g->group_master_log_name);
1669 }
1670 }
1671 assert((avail == size /* full */ || entry == size /* empty */) ||
1672 i == avail /* all occupied are processed */);
1673 }
1674
1675
do_report(loglevel level,int err_code,const char * msg,va_list args) const1676 void Slave_worker::do_report(loglevel level, int err_code, const char *msg,
1677 va_list args) const
1678 {
1679 char buff_coord[MAX_SLAVE_ERRMSG];
1680 char buff_gtid[Gtid::MAX_TEXT_LENGTH + 1];
1681 const char* log_name= const_cast<Slave_worker*>(this)->get_master_log_name();
1682 ulonglong log_pos= const_cast<Slave_worker*>(this)->get_master_log_pos();
1683 bool is_group_replication_applier_channel=
1684 channel_map.is_group_replication_channel_name(c_rli->get_channel(), true);
1685 const Gtid_specification *gtid_next= &info_thd->variables.gtid_next;
1686 THD *thd= info_thd;
1687
1688 gtid_next->to_string(global_sid_map, buff_gtid, true);
1689
1690 if (level == ERROR_LEVEL && (!has_temporary_error(thd, err_code) ||
1691 thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::SESSION)))
1692 {
1693 char coordinator_errmsg[MAX_SLAVE_ERRMSG];
1694 if (is_group_replication_applier_channel)
1695 {
1696 my_snprintf(coordinator_errmsg, MAX_SLAVE_ERRMSG,
1697 "Coordinator stopped because there were error(s) in the "
1698 "worker(s). "
1699 "The most recent failure being: Worker %u failed executing "
1700 "transaction '%s'. See error log and/or "
1701 "performance_schema.replication_applier_status_by_worker "
1702 "table for "
1703 "more details about this failure or others, if any.",
1704 internal_id, buff_gtid);
1705 }
1706 else
1707 {
1708 my_snprintf(coordinator_errmsg, MAX_SLAVE_ERRMSG,
1709 "Coordinator stopped because there were error(s) in the "
1710 "worker(s). "
1711 "The most recent failure being: Worker %u failed executing "
1712 "transaction '%s' at master log %s, end_log_pos %llu. "
1713 "See error log and/or "
1714 "performance_schema.replication_applier_status_by_worker "
1715 "table for "
1716 "more details about this failure or others, if any.",
1717 internal_id, buff_gtid, log_name, log_pos);
1718 }
1719
1720 /*
1721 We want to update the errors in coordinator as well as worker.
1722 The fill_coord_err_buf() function update the error number, message and
1723 timestamp fields. This function is different from va_report() as va_report()
1724 also logs the error message in the log apart from updating the error fields.
1725 So, the worker does the job of reporting the error in the log. We just make
1726 coordinator aware of the error.
1727 */
1728 c_rli->fill_coord_err_buf(level, err_code, coordinator_errmsg);
1729 }
1730
1731 if (is_group_replication_applier_channel)
1732 {
1733 my_snprintf(buff_coord, sizeof(buff_coord),
1734 "Worker %u failed executing transaction '%s'",
1735 internal_id, buff_gtid);
1736 }
1737 else
1738 {
1739 my_snprintf(buff_coord, sizeof(buff_coord),
1740 "Worker %u failed executing transaction '%s' at "
1741 "master log %s, end_log_pos %llu",
1742 internal_id, buff_gtid, log_name, log_pos);
1743 }
1744
1745 /*
1746 Error reporting by the worker. The worker updates its error fields as well
1747 as reports the error in the log.
1748 */
1749 this->va_report(level, err_code, buff_coord, msg, args);
1750 }
1751
operator new(size_t request)1752 void* Slave_worker::operator new(size_t request)
1753 {
1754 void* ptr;
1755 if (posix_memalign(&ptr, __alignof__(Slave_worker), sizeof(Slave_worker))) {
1756 throw std::bad_alloc();
1757 }
1758 return ptr;
1759 }
1760
operator delete(void * ptr)1761 void Slave_worker::operator delete(void * ptr)
1762 {
1763 free(ptr);
1764 }
1765
1766 #ifndef NDEBUG
may_have_timestamp(Log_event * ev)1767 static bool may_have_timestamp(Log_event *ev)
1768 {
1769 bool res= false;
1770
1771 switch (ev->get_type_code())
1772 {
1773 case binary_log::QUERY_EVENT:
1774 res= true;
1775 break;
1776
1777 case binary_log::GTID_LOG_EVENT:
1778 res= true;
1779 break;
1780
1781 default:
1782 break;
1783 }
1784
1785 return res;
1786 }
1787
get_last_committed(Log_event * ev)1788 static int64 get_last_committed(Log_event *ev)
1789 {
1790 int64 res= SEQ_UNINIT;
1791
1792 switch (ev->get_type_code())
1793 {
1794 case binary_log::GTID_LOG_EVENT:
1795 res= static_cast<Gtid_log_event*>(ev)->last_committed;
1796 break;
1797
1798 default:
1799 break;
1800 }
1801
1802 return res;
1803 }
1804
get_sequence_number(Log_event * ev)1805 static int64 get_sequence_number(Log_event *ev)
1806 {
1807 int64 res= SEQ_UNINIT;
1808
1809 switch (ev->get_type_code())
1810 {
1811 case binary_log::GTID_LOG_EVENT:
1812 res= static_cast<Gtid_log_event*>(ev)->sequence_number;
1813 break;
1814
1815 default:
1816 break;
1817 }
1818
1819 return res;
1820 }
1821 #endif
1822
1823 /**
1824 MTS worker main routine.
1825 The worker thread loops in waiting for an event, executing it and
1826 fixing statistics counters.
1827
1828 @param worker a pointer to the assigned Worker struct
1829 @param rli a pointer to Relay_log_info of Coordinator
1830 to update statistics.
1831
1832 @return 0 success
1833 -1 got killed or an error happened during appying
1834 */
slave_worker_exec_event(Log_event * ev)1835 int Slave_worker::slave_worker_exec_event(Log_event *ev)
1836 {
1837 Relay_log_info *rli= c_rli;
1838 THD *thd= info_thd;
1839 int ret= 0;
1840
1841 DBUG_ENTER("slave_worker_exec_event");
1842
1843 thd->server_id = ev->server_id;
1844 thd->set_time();
1845 thd->lex->set_current_select(0);
1846 if (!ev->common_header->when.tv_sec)
1847 ev->common_header->when.tv_sec= static_cast<long>(my_time(0));
1848 ev->thd= thd; // todo: assert because up to this point, ev->thd == 0
1849 ev->worker= this;
1850
1851 #ifndef NDEBUG
1852 if (!is_mts_db_partitioned(rli) && may_have_timestamp(ev) &&
1853 !curr_group_seen_sequence_number)
1854 {
1855 curr_group_seen_sequence_number= true;
1856
1857 longlong lwm_estimate= static_cast<Mts_submode_logical_clock*>
1858 (rli->current_mts_submode)->estimate_lwm_timestamp();
1859 int64 last_committed= get_last_committed(ev);
1860 int64 sequence_number= get_sequence_number(ev);
1861 /*
1862 The commit timestamp waiting condition:
1863
1864 lwm_estimate < last_committed <=> last_committed \not <= lwm_estimate
1865
1866 must have been satisfied by Coordinator.
1867 The first scheduled transaction does not have to wait for anybody.
1868 */
1869 assert(rli->gaq->entry == ev->mts_group_idx ||
1870 Mts_submode_logical_clock::clock_leq(last_committed,
1871 lwm_estimate));
1872 assert(lwm_estimate != SEQ_UNINIT || rli->gaq->entry == ev->mts_group_idx);
1873 /*
1874 The current transaction's timestamp can't be less that lwm.
1875 */
1876 assert(sequence_number == SEQ_UNINIT ||
1877 !Mts_submode_logical_clock::
1878 clock_leq(sequence_number,
1879 static_cast<Mts_submode_logical_clock*>
1880 (rli->current_mts_submode)->
1881 estimate_lwm_timestamp()));
1882 }
1883 #endif
1884
1885 // Address partioning only in database mode
1886 if (!is_gtid_event(ev) && is_mts_db_partitioned(rli))
1887 {
1888 if (ev->contains_partition_info(end_group_sets_max_dbs))
1889 {
1890 uint num_dbs= ev->mts_number_dbs();
1891
1892 if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
1893 num_dbs= 1;
1894
1895 assert(num_dbs > 0);
1896
1897 for (uint k= 0; k < num_dbs; k++)
1898 {
1899 bool found= false;
1900
1901 for (size_t i= 0; i < curr_group_exec_parts.size() && !found; i++)
1902 {
1903 found= curr_group_exec_parts[i] ==
1904 ev->mts_assigned_partitions[k];
1905 }
1906 if (!found)
1907 {
1908 /*
1909 notice, can't assert
1910 assert(ev->mts_assigned_partitions[k]->worker == worker);
1911 since entry could be marked as wanted by other worker.
1912 */
1913 curr_group_exec_parts.push_back(ev->mts_assigned_partitions[k]);
1914 }
1915 }
1916 end_group_sets_max_dbs= false;
1917 }
1918 }
1919
1920 set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
1921 set_master_log_pos(static_cast<ulong>(ev->common_header->log_pos));
1922 set_gaq_index(ev->mts_group_idx);
1923 ret= ev->do_apply_event_worker(this);
1924
1925 DBUG_EXECUTE_IF("after_executed_write_rows_event", {
1926 if (ev->get_type_code() == binary_log::WRITE_ROWS_EVENT) {
1927 static const char act[]= "now signal executed";
1928 assert(opt_debug_sync_timeout > 0);
1929 assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
1930 }
1931 };);
1932
1933 DBUG_RETURN(ret);
1934 }
1935
1936 /**
1937 Sleep for a given amount of seconds or until killed.
1938
1939 @param seconds The number of seconds to sleep.
1940
1941 @retval True if the thread has been killed, false otherwise.
1942 */
1943
worker_sleep(ulong seconds)1944 bool Slave_worker::worker_sleep(ulong seconds)
1945 {
1946 bool ret= false;
1947 struct timespec abstime;
1948 mysql_mutex_t *lock= &jobs_lock;
1949 mysql_cond_t *cond= &jobs_cond;
1950
1951 /* Absolute system time at which the sleep time expires. */
1952 set_timespec(&abstime, seconds);
1953
1954 mysql_mutex_lock(lock);
1955 info_thd->ENTER_COND(cond, lock, NULL, NULL);
1956
1957 while (!(ret= info_thd->killed || running_status != RUNNING))
1958 {
1959 int error= mysql_cond_timedwait(cond, lock, &abstime);
1960 if (error == ETIMEDOUT || error == ETIME)
1961 break;
1962 }
1963
1964 mysql_mutex_unlock(lock);
1965 info_thd->EXIT_COND(NULL);
1966 return ret;
1967 }
1968
prepare_for_retry(Log_event & event)1969 void Slave_worker::prepare_for_retry(Log_event &event) {
1970 if (event.get_type_code() ==
1971 binary_log::ROWS_QUERY_LOG_EVENT) // If a `Rows_query_log_event`, let the
1972 // event be disposed in the main worker
1973 // loop.
1974 {
1975 event.worker= this;
1976 this->rows_query_ev= NULL;
1977 }
1978 }
1979
1980 /**
1981 It is called after an error happens. It checks if that is an temporary
1982 error and if the situation is allow to retry the transaction. Then it will
1983 retry the transaction if it is allowed. Retry policy and logic is similar to
1984 single-threaded slave.
1985
1986 @param[in] start_relay_number The extension number of the relay log which
1987 includes the first event of the transaction.
1988 @param[in] start_relay_pos The offset of the transaction's first event.
1989
1990 @param[in] end_relay_number The extension number of the relay log which
1991 includes the last event it should retry.
1992 @param[in] end_relay_pos The offset of the last event it should retry.
1993
1994 @return false if succeeds, otherwise returns true.
1995 */
retry_transaction(uint start_relay_number,my_off_t start_relay_pos,uint end_relay_number,my_off_t end_relay_pos)1996 bool Slave_worker::retry_transaction(uint start_relay_number,
1997 my_off_t start_relay_pos,
1998 uint end_relay_number,
1999 my_off_t end_relay_pos)
2000 {
2001 THD *thd= info_thd;
2002 bool silent= false;
2003 Slave_worker::Retry_context_sentry cleaned_up(*this);
2004
2005 DBUG_ENTER("Slave_worker::retry_transaction");
2006
2007 if (slave_trans_retries == 0)
2008 DBUG_RETURN(true);
2009
2010 do
2011 {
2012 /* Simulate a lock deadlock error */
2013 uint error= 0;
2014 cleaned_up= false;
2015
2016 if (found_order_commit_deadlock())
2017 {
2018 /*
2019 This transaction was allowed to be executed in parallel with other that
2020 happened earlier according to binary log order. It was asked to be
2021 rolled back by the other transaction as it was holding a lock that is
2022 needed by the other transaction to progress, according to binary log
2023 order this configure a deadlock.
2024
2025 At this point, this transaction *should* have no non-temporary errors.
2026
2027 Having a non-temporary error may be a sign of:
2028
2029 a) Slave has diverged from the master;
2030 b) There is an issue in the logical clock allowing a transaction to be
2031 applied in parallel with its dependencies (the two transactions are
2032 trying to change the same record in parallel).
2033
2034 For (a), a retry of this transaction will produce the same error. For
2035 (b), this transaction might succeed upon retry, allowing the slave to
2036 progress without manual intervention, but it is a sign of problems in LC
2037 generation at the master.
2038
2039 So, we will make the worker to retry this transaction only if there is
2040 no error or the error is a temporary error.
2041 */
2042 Diagnostics_area *da= thd->get_stmt_da();
2043 if (!da->is_error() ||
2044 has_temporary_error(thd, da->is_error() ? da->mysql_errno() : error,
2045 &silent))
2046 {
2047 error= ER_LOCK_DEADLOCK;
2048 DBUG_EXECUTE_IF("simulate_exhausted_trans_retries",
2049 { trans_retries = slave_trans_retries; };);
2050 }
2051 }
2052
2053 if ((!has_temporary_error(thd, error, &silent) ||
2054 thd->get_transaction()->cannot_safely_rollback(
2055 Transaction_ctx::SESSION)) &&
2056 DBUG_EVALUATE_IF("error_on_rows_query_event_apply", false, true))
2057 DBUG_RETURN(true);
2058
2059 if (trans_retries >= slave_trans_retries)
2060 {
2061 thd->is_fatal_error= 1;
2062 c_rli->report(
2063 ERROR_LEVEL,
2064 thd->is_error() ? thd->get_stmt_da()->mysql_errno() : error,
2065 "worker thread retried transaction %lu time(s) "
2066 "in vain, giving up. Consider raising the value of "
2067 "the slave_transaction_retries variable.", trans_retries);
2068 DBUG_RETURN(true);
2069 }
2070
2071 DBUG_EXECUTE_IF("error_on_rows_query_event_apply", {
2072 if (c_rli->retried_trans == 2)
2073 {
2074 DBUG_SET("-d,error_on_rows_query_event_apply");
2075 std::string act("now SIGNAL end_retries_on_rows_query_event_apply");
2076 assert(!debug_sync_set_action(thd, act.data(), act.length()));
2077 }
2078 silent= true;
2079 };);
2080
2081 if (!silent)
2082 trans_retries++;
2083
2084 mysql_mutex_lock(&c_rli->data_lock);
2085 c_rli->retried_trans++;
2086 mysql_mutex_unlock(&c_rli->data_lock);
2087
2088 cleaned_up.clean();
2089 worker_sleep(min<ulong>(trans_retries, MAX_SLAVE_RETRY_PAUSE));
2090
2091 } while (read_and_apply_events(start_relay_number, start_relay_pos,
2092 end_relay_number, end_relay_pos));
2093 DBUG_RETURN(false);
2094 }
2095
2096 /**
2097 Read events from relay logs and apply them.
2098
2099 @param[in] start_relay_number The extension number of the relay log which
2100 includes the first event of the transaction.
2101 @param[in] start_relay_pos The offset of the transaction's first event.
2102
2103 @param[in] end_relay_number The extension number of the relay log which
2104 includes the last event it should retry.
2105 @param[in] end_relay_pos The offset of the last event it should retry.
2106
2107 @return false if succeeds, otherwise returns true.
2108 */
read_and_apply_events(uint start_relay_number,my_off_t start_relay_pos,uint end_relay_number,my_off_t end_relay_pos)2109 bool Slave_worker::read_and_apply_events(uint start_relay_number,
2110 my_off_t start_relay_pos,
2111 uint end_relay_number,
2112 my_off_t end_relay_pos)
2113 {
2114 DBUG_ENTER("Slave_worker::read_and_apply_events");
2115
2116 Relay_log_info *rli= c_rli;
2117 IO_CACHE relay_io;
2118 char file_name[FN_REFLEN+1];
2119 uint file_number= start_relay_number;
2120 bool error= true;
2121 bool arrive_end= false;
2122
2123 relay_log_number_to_name(start_relay_number, file_name);
2124
2125 memset(&relay_io, 0, sizeof(IO_CACHE));
2126
2127 while (!arrive_end)
2128 {
2129 Log_event *ev= NULL;
2130
2131 if (!my_b_inited(&relay_io))
2132 {
2133 const char *errmsg;
2134
2135 DBUG_PRINT("info", ("Open relay log %s", file_name));
2136
2137 if (open_binlog_file(&relay_io, file_name, &errmsg) == -1)
2138 {
2139 sql_print_error("Failed to open relay log %s, error: %s", file_name,
2140 errmsg);
2141 goto end;
2142 }
2143 // Search for Start_encryption_event. When relay log is encrypted the second
2144 // event (after Format_description_event) will be Start_encryption_event.
2145 for (uint i= 0; i < 2; i++)
2146 {
2147 ev= Log_event::read_log_event(&relay_io, NULL,
2148 rli->get_rli_description_event(),
2149 opt_slave_sql_verify_checksum);
2150
2151 if (ev != NULL)
2152 {
2153 if (ev->get_type_code() == binary_log::START_ENCRYPTION_EVENT &&
2154 !rli->get_rli_description_event()->start_decryption(static_cast<Start_encryption_log_event*>(ev)))
2155 {
2156 delete ev;
2157 goto end;
2158 error= true;
2159 }
2160 delete ev;
2161 }
2162 }
2163 my_b_seek(&relay_io, start_relay_pos);
2164 }
2165
2166 /* If it is the last event, then set arrive_end as true */
2167 arrive_end= (my_b_tell(&relay_io) == end_relay_pos &&
2168 file_number == end_relay_number);
2169
2170 ev= Log_event::read_log_event(&relay_io, NULL,
2171 rli->get_rli_description_event(),
2172 opt_slave_sql_verify_checksum);
2173 if (ev != NULL)
2174 {
2175 /* It is a event belongs to the transaction */
2176 if (!ev->is_mts_sequential_exec(rli->current_mts_submode->get_type() ==
2177 MTS_PARALLEL_TYPE_DB_NAME))
2178 {
2179 int ret= 0;
2180
2181 ev->future_event_relay_log_pos= my_b_tell(&relay_io);
2182 ev->mts_group_idx= gaq_index;
2183
2184 if (is_mts_db_partitioned(rli) && ev->contains_partition_info(true))
2185 assign_partition_db(ev);
2186
2187 ret= slave_worker_exec_event(ev);
2188 if (ev->worker != NULL)
2189 {
2190 delete ev;
2191 ev= NULL;
2192 }
2193
2194 if (ret != 0)
2195 goto end;
2196 }
2197 else
2198 {
2199 /*
2200 It is a Rotate_log_event, Format_description_log_event event or other
2201 type event doesn't belong to the transaction.
2202 */
2203 delete ev;
2204 ev= NULL;
2205 }
2206 }
2207 else
2208 {
2209 /*
2210 IO error happens if relay_io.error != 0, otherwise it arrives the
2211 end of the relay log
2212 */
2213 if (relay_io.error != 0)
2214 {
2215 sql_print_error("Error when worker read relay log events,"
2216 "relay log name %s, position %llu",
2217 rli->get_event_relay_log_name(), my_b_tell(&relay_io));
2218 goto end;
2219 }
2220
2221 if (rli->relay_log.find_next_relay_log(file_name))
2222 {
2223 sql_print_error("Failed to find next relay log when retrying the "
2224 "transaction, current relay log is %s", file_name);
2225 goto end;
2226 }
2227
2228 file_number= relay_log_name_to_number(file_name);
2229
2230 end_io_cache(&relay_io);
2231 mysql_file_close(relay_io.file, MYF(0));
2232 start_relay_pos= BIN_LOG_HEADER_SIZE;
2233 }
2234 }
2235
2236 error= false;
2237 end:
2238 if (my_b_inited(&relay_io))
2239 {
2240 end_io_cache(&relay_io);
2241 mysql_file_close(relay_io.file, MYF(0));
2242 }
2243 DBUG_RETURN(error);
2244 }
2245
2246 /*
2247 Find database entry from map_db_to_worker hash table.
2248 */
find_entry_from_db_map(const char * dbname,Relay_log_info * rli)2249 static db_worker_hash_entry *find_entry_from_db_map(const char *dbname,
2250 Relay_log_info *rli)
2251 {
2252 db_worker_hash_entry *entry= NULL;
2253 my_hash_value_type hash_value;
2254 uchar dblength= (uint) strlen(dbname);
2255
2256 hash_value= my_calc_hash(&rli->mapping_db_to_worker, (const uchar*) dbname,
2257 dblength);
2258
2259 mysql_mutex_lock(&rli->slave_worker_hash_lock);
2260
2261 entry= (db_worker_hash_entry *)
2262 my_hash_search_using_hash_value(&rli->mapping_db_to_worker, hash_value,
2263 (uchar*) dbname, dblength);
2264
2265 mysql_mutex_unlock(&rli->slave_worker_hash_lock);
2266 return entry;
2267 }
2268
2269 /*
2270 Initialize Log_event::mts_assigned_partitions array. It is for transaction
2271 retry and is only called when retrying a transaction by workers.
2272 */
assign_partition_db(Log_event * ev)2273 void Slave_worker::assign_partition_db(Log_event *ev)
2274 {
2275 Mts_db_names mts_dbs;
2276 int i;
2277
2278 ev->get_mts_dbs(&mts_dbs);
2279
2280 if (mts_dbs.num == OVER_MAX_DBS_IN_EVENT_MTS)
2281 ev->mts_assigned_partitions[0]= find_entry_from_db_map("", c_rli);
2282 else
2283 for (i= 0; i < mts_dbs.num; i++)
2284 ev->mts_assigned_partitions[i]= find_entry_from_db_map(mts_dbs.name[i],
2285 c_rli);
2286 }
2287
2288 // returns the next available! (TODO: incompatible to circurla_buff method!!!)
en_queue(Slave_jobs_queue * jobs,Slave_job_item * item)2289 static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
2290 {
2291 if (jobs->avail == jobs->size)
2292 {
2293 assert(jobs->avail == jobs->m_Q.size());
2294 return -1;
2295 }
2296
2297 // store
2298
2299 jobs->m_Q[jobs->avail]= *item;
2300
2301 // pre-boundary cond
2302 if (jobs->entry == jobs->size)
2303 jobs->entry= jobs->avail;
2304
2305 jobs->avail= (jobs->avail + 1) % jobs->size;
2306 jobs->len++;
2307
2308 // post-boundary cond
2309 if (jobs->avail == jobs->entry)
2310 jobs->avail= jobs->size;
2311 assert(jobs->avail == jobs->entry ||
2312 jobs->len == (jobs->avail >= jobs->entry) ?
2313 (jobs->avail - jobs->entry) : (jobs->size + jobs->avail - jobs->entry));
2314 return jobs->avail;
2315 }
2316
Retry_context_sentry(Slave_worker & parent)2317 Slave_worker::Retry_context_sentry::Retry_context_sentry(Slave_worker &parent)
2318 : m_parent(parent), m_is_cleaned_up(false) {}
2319
~Retry_context_sentry()2320 Slave_worker::Retry_context_sentry::~Retry_context_sentry() {
2321 this->clean();
2322 }
2323
operator =(bool is_cleaned_up)2324 Slave_worker::Retry_context_sentry &Slave_worker::Retry_context_sentry::operator=(
2325 bool is_cleaned_up)
2326 {
2327 this->m_is_cleaned_up = is_cleaned_up;
2328 return (*this);
2329 }
2330
clean()2331 void Slave_worker::Retry_context_sentry::clean() {
2332 if (!this->m_is_cleaned_up) {
2333 this->m_parent.cleanup_context(this->m_parent.info_thd, 1);
2334 this->m_parent.reset_order_commit_deadlock();
2335 this->m_is_cleaned_up = true;
2336 }
2337 }
2338
2339 /**
2340 return the value of @c data member of the head of the queue.
2341 */
head_queue(Slave_jobs_queue * jobs,Slave_job_item * ret)2342 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
2343 {
2344 if (jobs->entry == jobs->size)
2345 {
2346 assert(jobs->len == 0);
2347 ret->data= NULL; // todo: move to caller
2348 return NULL;
2349 }
2350 *ret= jobs->m_Q[jobs->entry];
2351
2352 assert(ret->data); // todo: move to caller
2353
2354 return ret;
2355 }
2356
2357
2358 /**
2359 return a job item through a struct which point is supplied via argument.
2360 */
de_queue(Slave_jobs_queue * jobs,Slave_job_item * ret)2361 Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
2362 {
2363 if (jobs->entry == jobs->size)
2364 {
2365 assert(jobs->len == 0);
2366 return NULL;
2367 }
2368 *ret= jobs->m_Q[jobs->entry];
2369 jobs->len--;
2370
2371 // pre boundary cond
2372 if (jobs->avail == jobs->size)
2373 jobs->avail= jobs->entry;
2374 jobs->entry= (jobs->entry + 1) % jobs->size;
2375
2376 // post boundary cond
2377 if (jobs->avail == jobs->entry)
2378 jobs->entry= jobs->size;
2379
2380 assert(jobs->entry == jobs->size ||
2381 (jobs->len == (jobs->avail >= jobs->entry) ?
2382 (jobs->avail - jobs->entry) :
2383 (jobs->size + jobs->avail - jobs->entry)));
2384
2385 return ret;
2386 }
2387
2388 /**
2389 Coordinator enqueues a job item into a Worker private queue.
2390
2391 @param job_item a pointer to struct carrying a reference to an event
2392 @param worker a pointer to the assigned Worker struct
2393 @param rli a pointer to Relay_log_info of Coordinator
2394
2395 @return false Success.
2396 true Thread killed or worker stopped while waiting for
2397 successful enqueue.
2398 */
append_item_to_jobs(slave_job_item * job_item,Slave_worker * worker,Relay_log_info * rli)2399 bool append_item_to_jobs(slave_job_item *job_item,
2400 Slave_worker *worker, Relay_log_info *rli)
2401 {
2402 THD *thd= rli->info_thd;
2403 int ret= -1;
2404 size_t ev_size= job_item->data->common_header->data_written;
2405 ulonglong new_pend_size;
2406 PSI_stage_info old_stage;
2407
2408 assert(thd == current_thd);
2409
2410 mysql_mutex_lock(&rli->pending_jobs_lock);
2411 new_pend_size= rli->mts_pending_jobs_size + ev_size;
2412 bool big_event= (ev_size > rli->mts_pending_jobs_size_max);
2413 /*
2414 C waits basing on *data* sizes in the queues.
2415 If it is a big event (event size is greater than
2416 slave_pending_jobs_size_max but less than slave_max_allowed_packet),
2417 it will wait for all the jobs in the workers's queue to be
2418 completed. If it is normal event (event size is less than
2419 slave_pending_jobs_size_max), then it will wait for
2420 enough empty memory to keep the event in one of the workers's
2421 queue.
2422 NOTE: Receiver thread (I/O thread) is taking care of restricting
2423 the event size to slave_max_allowed_packet. If an event from
2424 the master is bigger than this value, IO thread will be stopped
2425 with error ER_NET_PACKET_TOO_LARGE.
2426 */
2427 while ( (!big_event && new_pend_size > rli->mts_pending_jobs_size_max)
2428 || (big_event && rli->mts_pending_jobs_size != 0 ))
2429 {
2430 rli->mts_wq_oversize= TRUE;
2431 rli->wq_size_waits_cnt++; // waiting due to the total size
2432 thd->ENTER_COND(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
2433 &stage_slave_waiting_worker_to_free_events, &old_stage);
2434 mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
2435 mysql_mutex_unlock(&rli->pending_jobs_lock);
2436 thd->EXIT_COND(&old_stage);
2437 if (thd->killed)
2438 return true;
2439 if (rli->wq_size_waits_cnt % 10 == 1)
2440 sql_print_information("Multi-threaded slave: Coordinator has waited "
2441 "%lu times hitting slave_pending_jobs_size_max; "
2442 "current event size = %zu.",
2443 rli->wq_size_waits_cnt, ev_size);
2444 mysql_mutex_lock(&rli->pending_jobs_lock);
2445
2446 new_pend_size= rli->mts_pending_jobs_size + ev_size;
2447 }
2448 rli->pending_jobs++;
2449 rli->mts_pending_jobs_size= new_pend_size;
2450 rli->mts_events_assigned++;
2451
2452 mysql_mutex_unlock(&rli->pending_jobs_lock);
2453
2454 /*
2455 Sleep unless there is an underrunning Worker and the current Worker
2456 queue is empty or filled lightly (not more than underrun level).
2457 */
2458 if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF &&
2459 worker->jobs.len > worker->underrun_level)
2460 {
2461 /*
2462 todo: experiment with weight to get a good approximation formula.
2463 Max possible nap time is choosen 1 ms.
2464 The bigger the excessive overrun counter the longer the nap.
2465 */
2466 ulong nap_weight= rli->mts_wq_excess_cnt + 1;
2467 /*
2468 Nap time is a product of a weight factor and the basic nap unit.
2469 The weight factor is proportional to the worker queues overrun excess
2470 counter. For example when there were only one overruning Worker
2471 the max nap_weight as 0.1 * worker->jobs.size would be
2472 about 1600 so the max nap time is approx 0.008 secs.
2473 Such value is not reachable because of min().
2474 Notice, granularity of sleep depends on the resolution of the software
2475 clock, High-Resolution Timer (HRT) configuration. Without HRT
2476 the precision of wake-up through @c select() may be greater or
2477 equal 1 ms. So don't expect the nap last a prescribed fraction of 1 ms
2478 in such case.
2479 */
2480 my_sleep(min<ulong>(1000, nap_weight * rli->mts_coordinator_basic_nap));
2481 rli->mts_wq_no_underrun_cnt++;
2482 }
2483
2484 mysql_mutex_lock(&worker->jobs_lock);
2485
2486 // possible WQ overfill
2487 while (worker->running_status == Slave_worker::RUNNING && !thd->killed &&
2488 (ret= en_queue(&worker->jobs, job_item)) == -1)
2489 {
2490 thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
2491 &stage_slave_waiting_worker_queue, &old_stage);
2492 worker->jobs.overfill= TRUE;
2493 worker->jobs.waited_overfill++;
2494 rli->mts_wq_overfill_cnt++;
2495 mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
2496 mysql_mutex_unlock(&worker->jobs_lock);
2497 thd->EXIT_COND(&old_stage);
2498
2499 mysql_mutex_lock(&worker->jobs_lock);
2500 }
2501 if (ret != -1)
2502 {
2503 worker->curr_jobs++;
2504 if (worker->jobs.len == 1)
2505 mysql_cond_signal(&worker->jobs_cond);
2506
2507 mysql_mutex_unlock(&worker->jobs_lock);
2508 }
2509 else
2510 {
2511 mysql_mutex_unlock(&worker->jobs_lock);
2512
2513 mysql_mutex_lock(&rli->pending_jobs_lock);
2514 rli->pending_jobs--; // roll back of the prev incr
2515 rli->mts_pending_jobs_size -= ev_size;
2516 mysql_mutex_unlock(&rli->pending_jobs_lock);
2517 }
2518
2519 return (-1 != ret ? false : true);
2520 }
2521
2522 /**
2523 Remove a job item from the given workers job queue. It also updates related
2524 status.
2525
2526 param[in] job_item The job item will be removed
2527 param[in] worker The worker which job_item belongs to.
2528 param[in] rli slave's relay log info object.
2529 */
remove_item_from_jobs(slave_job_item * job_item,Slave_worker * worker,Relay_log_info * rli)2530 static void remove_item_from_jobs(slave_job_item *job_item,
2531 Slave_worker *worker, Relay_log_info *rli)
2532 {
2533 Log_event *ev= job_item->data;
2534
2535 mysql_mutex_lock(&worker->jobs_lock);
2536 de_queue(&worker->jobs, job_item);
2537 /* possible overfill */
2538 if (worker->jobs.len == worker->jobs.size - 1 &&
2539 worker->jobs.overfill == TRUE)
2540 {
2541 worker->jobs.overfill= false;
2542 // todo: worker->hungry_cnt++;
2543 mysql_cond_signal(&worker->jobs_cond);
2544 }
2545 mysql_mutex_unlock(&worker->jobs_lock);
2546
2547 /* statistics */
2548
2549 /* todo: convert to rwlock/atomic write */
2550 mysql_mutex_lock(&rli->pending_jobs_lock);
2551
2552 rli->pending_jobs--;
2553 rli->mts_pending_jobs_size-= ev->common_header->data_written;
2554 assert(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
2555
2556 /*
2557 The positive branch is underrun: number of pending assignments
2558 is less than underrun level.
2559 Zero of jobs.len has to reset underrun w_id as the worker may get
2560 the next piece of assignement in a long time.
2561 */
2562 if (worker->underrun_level > worker->jobs.len && worker->jobs.len != 0)
2563 {
2564 rli->mts_wq_underrun_w_id= worker->id;
2565 } else if (rli->mts_wq_underrun_w_id == worker->id)
2566 {
2567 // reset only own marking
2568 rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
2569 }
2570
2571 /*
2572 Overrun handling.
2573 Incrementing the Worker private and the total excess counter corresponding
2574 to number of events filled above the overrun_level.
2575 The increment amount to the total counter is a difference between
2576 the current and the previous private excess (worker->wq_overrun_cnt).
2577 When the current queue length drops below overrun_level the global
2578 counter is decremented, the local is reset.
2579 */
2580 if (worker->overrun_level < worker->jobs.len)
2581 {
2582 ulong last_overrun= worker->wq_overrun_cnt;
2583 ulong excess_delta;
2584
2585 /* current overrun */
2586 worker->wq_overrun_cnt= worker->jobs.len - worker->overrun_level;
2587 excess_delta= worker->wq_overrun_cnt - last_overrun;
2588 worker->excess_cnt+= excess_delta;
2589 rli->mts_wq_excess_cnt+= excess_delta;
2590 rli->mts_wq_overrun_cnt++; // statistics
2591
2592 // guarding correctness of incrementing in case of the only one Worker
2593 assert(rli->workers.size() != 1 ||
2594 rli->mts_wq_excess_cnt == worker->wq_overrun_cnt);
2595 }
2596 else if (worker->excess_cnt > 0)
2597 {
2598 // When level drops below the total excess is decremented by the
2599 // value of the worker's contribution to the total excess.
2600 rli->mts_wq_excess_cnt-= worker->excess_cnt;
2601 worker->excess_cnt= 0;
2602 worker->wq_overrun_cnt= 0; // and the local is reset
2603
2604 assert(rli->mts_wq_excess_cnt >= 0);
2605 assert(rli->mts_wq_excess_cnt == 0 || rli->workers.size() > 1);
2606
2607 }
2608
2609 /* coordinator can be waiting */
2610 if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
2611 rli->mts_wq_oversize) // TODO: unit/general test wq_oversize
2612 {
2613 rli->mts_wq_oversize= FALSE;
2614 mysql_cond_signal(&rli->pending_jobs_cond);
2615 }
2616
2617 mysql_mutex_unlock(&rli->pending_jobs_lock);
2618
2619 worker->events_done++;
2620 }
2621 /**
2622 Worker's routine to wait for a new assignement through
2623 @c append_item_to_jobs()
2624
2625 @param worker a pointer to the waiting Worker struct
2626 @param job_item a pointer to struct carrying a reference to an event
2627
2628 @return NULL failure or
2629 a-pointer to an item.
2630 */
pop_jobs_item(Slave_worker * worker,Slave_job_item * job_item)2631 struct slave_job_item* pop_jobs_item(Slave_worker *worker,
2632 Slave_job_item *job_item)
2633 {
2634 THD *thd= worker->info_thd;
2635
2636 mysql_mutex_lock(&worker->jobs_lock);
2637
2638 job_item->data= NULL;
2639 while (!job_item->data && !thd->killed &&
2640 (worker->running_status == Slave_worker::RUNNING ||
2641 worker->running_status == Slave_worker::STOP))
2642 {
2643 PSI_stage_info old_stage;
2644
2645 if (set_max_updated_index_on_stop(worker, job_item))
2646 break;
2647 if (job_item->data == NULL)
2648 {
2649 worker->wq_empty_waits++;
2650 thd->ENTER_COND(&worker->jobs_cond, &worker->jobs_lock,
2651 &stage_slave_waiting_event_from_coordinator,
2652 &old_stage);
2653 mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
2654 mysql_mutex_unlock(&worker->jobs_lock);
2655 thd->EXIT_COND(&old_stage);
2656 mysql_mutex_lock(&worker->jobs_lock);
2657 }
2658 }
2659 if (job_item->data)
2660 worker->curr_jobs--;
2661
2662 mysql_mutex_unlock(&worker->jobs_lock);
2663
2664 thd_proc_info(worker->info_thd, "Executing event");
2665 return job_item;
2666 }
2667
2668 /**
2669 Report a not yet reported error to the coordinator if necessary.
2670
2671 All issues detected when applying binary log events are reported using
2672 rli->report(), but when an issue is not reported by the log event being
2673 applied, there is a workaround at handle_slave_sql() to report the issue
2674 also using rli->report() for the STS applier (or the MTS coordinator).
2675
2676 This function implements the workaround for a MTS worker.
2677
2678 @param worker the worker to be evaluated.
2679 */
report_error_to_coordinator(Slave_worker * worker)2680 void report_error_to_coordinator(Slave_worker *worker)
2681 {
2682 THD *thd= worker->info_thd;
2683 /*
2684 It is possible that the worker had failed to apply the event but
2685 did not reported about the failure using rli->report(). An example
2686 of such cases are failures caused by setting GTID_NEXT variable with
2687 an unsupported GTID mode (GTID_SET when GTID_MODE = OFF, anonymous
2688 GTID when GTID_MODE = ON).
2689 */
2690 if (thd->is_error())
2691 {
2692 char const *const errmsg= thd->get_stmt_da()->message_text();
2693 DBUG_PRINT("info",
2694 ("thd->get_stmt_da()->get_mysql_errno()=%d; "
2695 "worker->last_error.number=%d",
2696 thd->get_stmt_da()->mysql_errno(),
2697 worker->last_error().number));
2698
2699 if (worker->last_error().number == 0 &&
2700 /*
2701 When another worker that should commit before the current worker
2702 being evaluated has failed and the commit order should be preserved
2703 the current worker was asked to roll back and would stop with the
2704 ER_SLAVE_WORKER_STOPPED_PREVIOUS_THD_ERROR not yet reported to the
2705 coordinator. Reporting this error to the coordinator would be a
2706 mistake and would mask the real issue that lead to the MTS stop as
2707 the coordinator reports only the last error reported to it as the
2708 cause of the MTS failure.
2709
2710 So, we should skip reporting the error if it was reported because
2711 the current transaction had to be rolled back by a failure in a
2712 previous transaction in the commit order while the current
2713 transaction was waiting to be committed.
2714 */
2715 thd->get_stmt_da()->mysql_errno() !=
2716 ER_SLAVE_WORKER_STOPPED_PREVIOUS_THD_ERROR)
2717 {
2718 /*
2719 This function is reporting an error which was not reported
2720 while executing exec_relay_log_event().
2721 */
2722 worker->report(ERROR_LEVEL, thd->get_stmt_da()->mysql_errno(),
2723 "%s", errmsg);
2724 }
2725 }
2726 }
2727
2728 /**
2729 apply one job group.
2730
2731 @note the function maintains worker's CGEP and modifies APH, updates
2732 the current group item in GAQ via @c slave_worker_ends_group().
2733
2734 param[in] worker the worker which calls it.
2735 param[in] rli slave's relay log info object.
2736
2737 return returns 0 if the group of jobs are applied successfully, otherwise
2738 returns an error code.
2739 */
slave_worker_exec_job_group(Slave_worker * worker,Relay_log_info * rli)2740 int slave_worker_exec_job_group(Slave_worker *worker, Relay_log_info *rli)
2741 {
2742 struct slave_job_item item= {NULL, 0, 0};
2743 struct slave_job_item *job_item= &item;
2744 THD *thd= worker->info_thd;
2745 bool seen_gtid= false;
2746 bool seen_begin= false;
2747 int error= 0;
2748 Log_event *ev= NULL;
2749 uint start_relay_number;
2750 my_off_t start_relay_pos;
2751
2752 DBUG_ENTER("slave_worker_exec_job_group");
2753
2754 if (unlikely(worker->trans_retries > 0))
2755 worker->trans_retries= 0;
2756
2757 job_item= pop_jobs_item(worker, job_item);
2758 start_relay_number= job_item->relay_number;
2759 start_relay_pos= job_item->relay_pos;
2760
2761 while (1)
2762 {
2763 Slave_job_group *ptr_g;
2764
2765 if (unlikely(thd->killed || worker->running_status == Slave_worker::STOP_ACCEPTED))
2766 {
2767 assert(worker->running_status != Slave_worker::ERROR_LEAVING);
2768 // de-queueing and decrement counters is in the caller's exit branch
2769 error= -1;
2770 goto err;
2771 }
2772
2773 ev= job_item->data;
2774 assert(ev != NULL);
2775 DBUG_PRINT("info", ("W_%lu <- job item: %p data: %p thd: %p",
2776 worker->id, job_item, ev, thd));
2777 if (is_gtid_event(ev))
2778 seen_gtid= true;
2779 if (!seen_begin && ev->starts_group())
2780 {
2781 seen_begin= true; // The current group is started with B-event
2782 worker->end_group_sets_max_dbs= true;
2783 }
2784 set_timespec_nsec(&worker->ts_exec[0], 0); // pre-exec
2785 worker->stats_read_time += diff_timespec(&worker->ts_exec[0],
2786 &worker->ts_exec[1]);
2787 /* Adapting to possible new Format_description_log_event */
2788 ptr_g= rli->gaq->get_job_group(ev->mts_group_idx);
2789 if (ptr_g->new_fd_event)
2790 {
2791 worker->set_rli_description_event(ptr_g->new_fd_event);
2792 ptr_g->new_fd_event= NULL;
2793 }
2794
2795 error= worker->slave_worker_exec_event(ev);
2796
2797 set_timespec_nsec(&worker->ts_exec[1], 0); // pre-exec
2798 worker->stats_exec_time += diff_timespec(&worker->ts_exec[1],
2799 &worker->ts_exec[0]);
2800 if (error || worker->found_order_commit_deadlock())
2801 {
2802 worker->prepare_for_retry(*ev);
2803 error= worker->retry_transaction(start_relay_number, start_relay_pos,
2804 job_item->relay_number,
2805 job_item->relay_pos);
2806 if (error)
2807 goto err;
2808 }
2809 /*
2810 p-event or any other event of B-free (malformed) group can
2811 "commit" with logical clock scheduler. In that case worker id
2812 points to the only active "exclusive" Worker that processes such
2813 malformed group events one by one.
2814 WL#7592 refines the original assert disjunction formula
2815 with the final disjunct.
2816 */
2817 assert(seen_begin || is_gtid_event(ev) ||
2818 ev->get_type_code() == binary_log::QUERY_EVENT ||
2819 is_mts_db_partitioned(rli) || worker->id == 0 || seen_gtid);
2820
2821 if (ev->ends_group() ||
2822 (!seen_begin && !is_gtid_event(ev) &&
2823 (ev->get_type_code() == binary_log::QUERY_EVENT ||
2824 /* break through by LC only in GTID off */
2825 (!seen_gtid && !is_mts_db_partitioned(rli)))))
2826 break;
2827
2828 remove_item_from_jobs(job_item, worker, rli);
2829 /* The event will be used later if worker is NULL, so it is not freed */
2830 if (ev->worker != NULL)
2831 delete ev;
2832
2833 job_item= pop_jobs_item(worker, job_item);
2834 }
2835
2836 DBUG_PRINT("info", (" commits GAQ index %lu, last committed %lu",
2837 ev->mts_group_idx, worker->last_group_done_index));
2838 /* The group is applied successfully, so error should be 0 */
2839 worker->slave_worker_ends_group(ev, 0);
2840
2841 #ifndef NDEBUG
2842 DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group"
2843 " %u processed %lu debug %d\n", worker->id, opt_mts_checkpoint_group,
2844 worker->groups_done,
2845 DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
2846
2847 if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
2848 opt_mts_checkpoint_group == worker->groups_done)
2849 {
2850 DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id));
2851 while (true) my_sleep(6000000);
2852 }
2853 #endif
2854
2855 remove_item_from_jobs(job_item, worker, rli);
2856 delete ev;
2857
2858 DBUG_RETURN(0);
2859 err:
2860 if (error)
2861 {
2862 report_error_to_coordinator(worker);
2863 DBUG_PRINT("info", ("Worker %lu is exiting: killed %i, error %i, "
2864 "running_status %d",
2865 worker->id, thd->killed, thd->is_error(),
2866 worker->running_status));
2867 worker->slave_worker_ends_group(ev, error); /* last done sets post exec */
2868 }
2869 DBUG_RETURN(error);
2870 }
2871
get_for_channel_str(bool upper_case) const2872 const char* Slave_worker::get_for_channel_str(bool upper_case) const
2873 {
2874 return c_rli->get_for_channel_str(upper_case);
2875 }
2876
get_table_pk_field_indexes()2877 const uint* Slave_worker::get_table_pk_field_indexes()
2878 {
2879 return info_slave_worker_table_pk_field_indexes;
2880 }
2881
get_channel_field_index()2882 uint Slave_worker::get_channel_field_index()
2883 {
2884 return LINE_FOR_CHANNEL;
2885 }
2886