1 /* Copyright (c) 2013, 2020, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include "sql/rpl_mts_submode.h"
24
25 #include <limits.h>
26 #include <string.h>
27 #include <time.h>
28 #include <memory>
29
30 #include "lex_string.h"
31 #include "libbinlogevents/include/compression/iterator.h"
32 #include "m_string.h"
33 #include "my_byteorder.h"
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_inttypes.h"
37 #include "my_loglevel.h"
38 #include "my_systime.h"
39 #include "my_thread.h"
40 #include "mysql/components/services/log_builtins.h"
41 #include "mysql/components/services/psi_stage_bits.h"
42 #include "mysql/psi/mysql_cond.h"
43 #include "mysql/psi/mysql_mutex.h"
44 #include "mysqld_error.h"
45 #include "sql/binlog_reader.h"
46 #include "sql/debug_sync.h"
47 #include "sql/log.h"
48 #include "sql/log_event.h" // Query_log_event
49 #include "sql/mdl.h"
50 #include "sql/mysqld.h" // stage_worker_....
51 #include "sql/query_options.h"
52 #include "sql/rpl_filter.h"
53 #include "sql/rpl_rli.h" // Relay_log_info
54 #include "sql/rpl_rli_pdb.h" // db_worker_hash_entry
55 #include "sql/rpl_slave.h"
56 #include "sql/rpl_slave_commit_order_manager.h" // Commit_order_manager
57 #include "sql/sql_class.h" // THD
58 #include "sql/system_variables.h"
59 #include "sql/table.h"
60
61 /**
62 Does necessary arrangement before scheduling next event.
63 @return 1 if error
64 0 no error
65 */
schedule_next_event(Relay_log_info *,Log_event *)66 int Mts_submode_database::schedule_next_event(Relay_log_info *, Log_event *) {
67 /*nothing to do here*/
68 return 0;
69 }
70
71 /**
72 Logic to attach temporary tables.
73 */
attach_temp_tables(THD * thd,const Relay_log_info *,Query_log_event * ev)74 void Mts_submode_database::attach_temp_tables(THD *thd, const Relay_log_info *,
75 Query_log_event *ev) {
76 int i, parts;
77 DBUG_TRACE;
78 if (!is_mts_worker(thd) || (ev->ends_group() || ev->starts_group())) return;
79 DBUG_ASSERT(!thd->temporary_tables);
80 // in over max-db:s case just one special partition is locked
81 parts = ((ev->mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
82 ? 1
83 : ev->mts_accessed_dbs);
84 for (i = 0; i < parts; i++) {
85 mts_move_temp_tables_to_thd(
86 thd, ev->mts_assigned_partitions[i]->temporary_tables);
87 ev->mts_assigned_partitions[i]->temporary_tables = nullptr;
88 }
89 }
90
91 /**
92 Function is called by Coordinator when it identified an event
93 requiring sequential execution.
94 Creating sequential context for the event includes waiting
95 for the assigned to Workers tasks to be completed and their
96 resources such as temporary tables be returned to Coordinator's
97 repository.
98 In case all workers are waited Coordinator changes its group status.
99
100 @param rli Relay_log_info instance of Coordinator
101 @param ignore Optional Worker instance pointer if the sequential context
102 is established due for the ignore Worker. Its resources
103 are to be retained.
104
105 @note Resources that are not occupied by Workers such as
106 a list of temporary tables held in unused (zero-usage) records
107 of APH are relocated to the Coordinator placeholder.
108
109 @return non-negative number of released by Workers partitions
110 (one partition by one Worker can count multiple times)
111
112 or -1 to indicate there has been a failure on a not-ignored Worker
113 as indicated by its running_status so synchronization can't succeed.
114 */
115
wait_for_workers_to_finish(Relay_log_info * rli,Slave_worker * ignore)116 int Mts_submode_database::wait_for_workers_to_finish(Relay_log_info *rli,
117 Slave_worker *ignore) {
118 uint ret = 0;
119 THD *thd = rli->info_thd;
120 bool cant_sync = false;
121 char llbuf[22];
122
123 DBUG_TRACE;
124
125 llstr(rli->get_event_relay_log_pos(), llbuf);
126 DBUG_PRINT("info", ("Coordinator and workers enter synchronization "
127 "procedure when scheduling event relay-log: %s "
128 "pos: %s",
129 rli->get_event_relay_log_name(), llbuf));
130
131 mysql_mutex_lock(&rli->slave_worker_hash_lock);
132
133 for (const auto &key_and_value : rli->mapping_db_to_worker) {
134 db_worker_hash_entry *entry = key_and_value.second.get();
135 DBUG_ASSERT(entry);
136
137 // the ignore Worker retains its active resources
138 if (ignore && entry->worker == ignore && entry->usage > 0) {
139 continue;
140 }
141
142 if (entry->usage > 0 && !thd->killed) {
143 PSI_stage_info old_stage;
144 Slave_worker *w_entry = entry->worker;
145
146 entry->worker = nullptr; // mark Worker to signal when usage drops to 0
147 thd->ENTER_COND(
148 &rli->slave_worker_hash_cond, &rli->slave_worker_hash_lock,
149 &stage_slave_waiting_worker_to_release_partition, &old_stage);
150 do {
151 mysql_cond_wait(&rli->slave_worker_hash_cond,
152 &rli->slave_worker_hash_lock);
153 DBUG_PRINT("info", ("Either got awakened of notified: "
154 "entry %p, usage %lu, worker %lu",
155 entry, entry->usage, w_entry->id));
156 } while (entry->usage != 0 && !thd->killed);
157 entry->worker =
158 w_entry; // restoring last association, needed only for assert
159 mysql_mutex_unlock(&rli->slave_worker_hash_lock);
160 thd->EXIT_COND(&old_stage);
161 ret++;
162 } else {
163 mysql_mutex_unlock(&rli->slave_worker_hash_lock);
164 }
165 // resources relocation
166 mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
167 entry->temporary_tables = nullptr;
168 if (entry->worker->running_status != Slave_worker::RUNNING)
169 cant_sync = true;
170 mysql_mutex_lock(&rli->slave_worker_hash_lock);
171 }
172
173 mysql_mutex_unlock(&rli->slave_worker_hash_lock);
174
175 if (!ignore) {
176 DBUG_PRINT("info", ("Coordinator synchronized with workers, "
177 "waited entries: %d, cant_sync: %d",
178 ret, cant_sync));
179
180 rli->mts_group_status = Relay_log_info::MTS_NOT_IN_GROUP;
181 }
182
183 return !cant_sync ? ret : -1;
184 }
185
unfold_transaction_payload_event(Format_description_event & fde,Transaction_payload_log_event & tple,std::vector<Log_event * > & events)186 bool Mts_submode_database::unfold_transaction_payload_event(
187 Format_description_event &fde, Transaction_payload_log_event &tple,
188 std::vector<Log_event *> &events) {
189 bool error = false;
190 /*
191 disable checksums - there are no checksums for events inside the tple
192 otherwise, the last 4 bytes would be truncated.
193
194 We do this by copying the fdle from the rli. Then we disable the checksum
195 in the copy. Then we use it to decode the events in the payload instead
196 of the original fdle.
197
198 We allocate the fdle copy in the stack.
199
200 TODO: simplify this by breaking the binlog_event_deserialize API
201 and make it take a single boolean instead that states whether the
202 event has a checksum in it or not.
203 */
204 Format_description_log_event fdle(fde.reader().buffer(), &fde);
205 fdle.footer()->checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
206 fdle.register_temp_buf(const_cast<char *>(fde.reader().buffer()), false);
207
208 // unpack the event
209 binary_log::transaction::compression::Iterable_buffer it(
210 tple.get_payload(), tple.get_payload_size(), tple.get_uncompressed_size(),
211 tple.get_compression_type());
212
213 for (auto ptr : it) {
214 Log_event *next = nullptr;
215 size_t event_len = uint4korr(ptr + EVENT_LEN_OFFSET);
216 if (binlog_event_deserialize(reinterpret_cast<const unsigned char *>(ptr),
217 event_len, &fdle, true, &next)) {
218 error = true; /* purecov: inspected */
219 break; /* purecov: inspected */
220 } else {
221 DBUG_ASSERT(next != nullptr);
222 events.push_back(next);
223 }
224 }
225
226 return error;
227 }
228
set_multi_threaded_applier_context(const Relay_log_info & rli,Log_event & ev)229 bool Mts_submode_database::set_multi_threaded_applier_context(
230 const Relay_log_info &rli, Log_event &ev) {
231 bool error = false;
232
233 // if this is a transaction payload event, we need to set the proper
234 // databases that its internal events update
235 if (ev.get_type_code() == binary_log::TRANSACTION_PAYLOAD_EVENT) {
236 Mts_db_names toset;
237 bool max_mts_dbs_in_event = false;
238 std::set<std::string> dbs;
239 auto &tple = static_cast<Transaction_payload_log_event &>(ev);
240 std::vector<Log_event *> events;
241 unfold_transaction_payload_event(*rli.get_rli_description_event(), tple,
242 events);
243
244 for (auto inner : events) {
245 Mts_db_names mts_dbs;
246
247 // This transaction payload event is already marked to run in
248 // isolation or the event being handled does not contain partition
249 // information
250 if (max_mts_dbs_in_event || !inner->contains_partition_info(true)) {
251 delete inner;
252 continue;
253 }
254
255 // The following queries should run in isolation, thence setting
256 // OVER_MAX_DBS_IN_EVENT_MTS
257 if ((inner->get_type_code() == binary_log::QUERY_EVENT)) {
258 auto qev = static_cast<Query_log_event *>(inner);
259 if (qev->is_query_prefix_match(STRING_WITH_LEN("XA COMMIT")) ||
260 qev->is_query_prefix_match(STRING_WITH_LEN("XA ROLLBACK"))) {
261 max_mts_dbs_in_event = true;
262 delete inner;
263 continue;
264 }
265 }
266
267 // OK, now that we have ruled the exceptions, lets handle the databases
268 // in the inner event.
269 inner->get_mts_dbs(&mts_dbs, rli.rpl_filter);
270
271 // inner event has mark to run in isolation
272 if (mts_dbs.num == OVER_MAX_DBS_IN_EVENT_MTS) {
273 max_mts_dbs_in_event = true;
274 delete inner;
275 continue;
276 }
277
278 // iterate over the databases and add them to the set
279 for (int i = 0; i < mts_dbs.num; i++) {
280 dbs.insert(mts_dbs.name[i]);
281 if (dbs.size() == MAX_DBS_IN_EVENT_MTS) {
282 max_mts_dbs_in_event = true;
283 break;
284 }
285 }
286
287 // inner event not needed anymore. Delete.
288 delete inner;
289 }
290
291 // now set the database information in the event
292 if (max_mts_dbs_in_event) {
293 toset.name[0] = "\0";
294 toset.num = OVER_MAX_DBS_IN_EVENT_MTS;
295 } else {
296 int i = 0;
297 // set the databases
298 for (auto &db : dbs) toset.name[i++] = db.c_str();
299
300 // set the number of databases
301 toset.num = dbs.size();
302 }
303
304 // save the mts_dbs to the payload event
305 tple.set_mts_dbs(toset);
306 }
307
308 return error;
309 }
310
311 /**
312 Logic to detach the temporary tables from the worker threads upon
313 event execution.
314 @param thd THD instance
315 @param rli Relay_log_info pointer
316 @param ev Query_log_event that is being applied
317 */
detach_temp_tables(THD * thd,const Relay_log_info * rli,Query_log_event * ev)318 void Mts_submode_database::detach_temp_tables(THD *thd,
319 const Relay_log_info *rli,
320 Query_log_event *ev) {
321 DBUG_TRACE;
322 if (!is_mts_worker(thd)) return;
323 int parts = ((ev->mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
324 ? 1
325 : ev->mts_accessed_dbs);
326 /*
327 todo: optimize for a case of
328
329 a. one db
330 Only detaching temporary_tables from thd to entry would require
331 instead of the double-loop below.
332
333 b. unchanged thd->temporary_tables.
334 In such case the involved entries would continue to hold the
335 unmodified lists provided that the attach_ method does not
336 destroy references to them.
337 */
338 for (int i = 0; i < parts; i++) {
339 ev->mts_assigned_partitions[i]->temporary_tables = nullptr;
340 }
341
342 Rpl_filter *rpl_filter = rli->rpl_filter;
343 for (TABLE *table = thd->temporary_tables; table;) {
344 int i;
345 const char *db_name = nullptr;
346
347 // find which entry to go
348 for (i = 0; i < parts; i++) {
349 db_name = ev->mts_accessed_db_names[i];
350 if (!strlen(db_name)) break;
351 // Only default database is rewritten.
352 if (!rpl_filter->is_rewrite_empty() && !strcmp(ev->get_db(), db_name)) {
353 size_t dummy_len;
354 const char *db_filtered =
355 rpl_filter->get_rewrite_db(db_name, &dummy_len);
356 // db_name != db_filtered means that db_name is rewritten.
357 if (strcmp(db_name, db_filtered)) db_name = db_filtered;
358 }
359 if (strcmp(table->s->db.str, db_name) < 0)
360 continue;
361 else {
362 // When rewrite db rules are used we can not rely on
363 // mts_accessed_db_names elements order.
364 if (!rpl_filter->is_rewrite_empty() &&
365 strcmp(table->s->db.str, db_name))
366 continue;
367 else
368 break;
369 }
370 }
371 DBUG_ASSERT(db_name &&
372 (!strcmp(table->s->db.str, db_name) || !strlen(db_name)));
373 DBUG_ASSERT(i < ev->mts_accessed_dbs);
374 // table pointer is shifted inside the function
375 table = mts_move_temp_table_to_entry(table, thd,
376 ev->mts_assigned_partitions[i]);
377 }
378
379 DBUG_ASSERT(!thd->temporary_tables);
380 #ifndef DBUG_OFF
381 for (int i = 0; i < parts; i++) {
382 DBUG_ASSERT(!ev->mts_assigned_partitions[i]->temporary_tables ||
383 !ev->mts_assigned_partitions[i]->temporary_tables->prev);
384 }
385 #endif
386 }
387
388 /**
389 Logic to get least occupied worker when the sql mts_submode= database
390 @param ws array of worker threads
391 @return slave worker thread
392 */
get_least_occupied_worker(Relay_log_info *,Slave_worker_array * ws,Log_event *)393 Slave_worker *Mts_submode_database::get_least_occupied_worker(
394 Relay_log_info *, Slave_worker_array *ws, Log_event *) {
395 long usage = LONG_MAX;
396 Slave_worker **ptr_current_worker = nullptr, *worker = nullptr;
397
398 DBUG_TRACE;
399
400 #ifndef DBUG_OFF
401
402 if (DBUG_EVALUATE_IF("mts_distribute_round_robin", 1, 0)) {
403 worker = ws->at(w_rr % ws->size());
404 LogErr(INFORMATION_LEVEL, ER_RPL_WORKER_ID_IS, worker->id,
405 static_cast<ulong>(w_rr % ws->size()));
406 DBUG_ASSERT(worker != nullptr);
407 return worker;
408 }
409 #endif
410
411 for (Slave_worker **it = ws->begin(); it != ws->end(); ++it) {
412 ptr_current_worker = it;
413 if ((*ptr_current_worker)->usage_partition <= usage) {
414 worker = *ptr_current_worker;
415 usage = (*ptr_current_worker)->usage_partition;
416 }
417 }
418 DBUG_ASSERT(worker != nullptr);
419 return worker;
420 }
421
422 /* MTS submode master Default constructor */
Mts_submode_logical_clock()423 Mts_submode_logical_clock::Mts_submode_logical_clock() {
424 type = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
425 first_event = true;
426 force_new_group = false;
427 is_new_group = true;
428 delegated_jobs = 0;
429 jobs_done = 0;
430 last_lwm_timestamp = SEQ_UNINIT;
431 last_lwm_index = INDEX_UNDEF;
432 is_error = false;
433 min_waited_timestamp = SEQ_UNINIT;
434 last_committed = SEQ_UNINIT;
435 sequence_number = SEQ_UNINIT;
436 }
437
438 /**
439 The method finds the minimum logical timestamp (low-water-mark) of
440 committed transactions.
441 The successful search results in a pair of a logical timestamp value and a
442 GAQ index that contains it. last_lwm_timestamp may still be raised though the
443 search does not find any satisfying running index. Search is implemented as
444 headway scanning of GAQ from a point of a previous search's stop position
445 (last_lwm_index). Whether the cached (memorized) index value is considered to
446 be stale when its timestamp gets less than the current "stable" LWM:
447
448 last_lwm_timestamp <= GAQ.lwm.sequence_number (*)
449
450 Staleness is caused by GAQ garbage collection that increments the rhs of (*),
451 see @c move_queue_head(). When that's diagnosed, the search in GAQ needs
452 restarting from the queue tail.
453
454 Formally, the undefined cached value of last_lwm_timestamp is also stale.
455
456 @verbatim
457 the last time index containg lwm
458 +------+
459 | LWM |
460 | | |
461 V V V
462 GAQ: xoooooxxxxxXXXXX...X
463 ^ ^
464 | | LWM+1
465 |
466 +- tne new current_lwm
467
468 <---- logical (commit) time ----
469 @endverbatim
470
471 here `x' stands for committed, `X' for committed and discarded from
472 the running range of the queue, `o' for not committed.
473
474 @param rli Relay_log_info pointer
475 @param need_lock Either the caller or the function must hold a mutex
476 to avoid race with concurrent GAQ update.
477
478 @return possibly updated current_lwm
479 */
get_lwm_timestamp(Relay_log_info * rli,bool need_lock)480 longlong Mts_submode_logical_clock::get_lwm_timestamp(Relay_log_info *rli,
481 bool need_lock) {
482 longlong lwm_estim;
483 Slave_job_group *ptr_g = nullptr;
484 bool is_stale = false;
485
486 if (!need_lock) mysql_mutex_lock(&rli->mts_gaq_LOCK);
487
488 /*
489 Make the "stable" LWM-based estimate which will be compared
490 against the cached "instant" value.
491 */
492 lwm_estim = rli->gaq->lwm.sequence_number;
493 /*
494 timestamp continuity invariant: if the queue has any item
495 its timestamp is greater on one than the estimate.
496 */
497 DBUG_ASSERT(lwm_estim == SEQ_UNINIT || rli->gaq->empty() ||
498 lwm_estim + 1 ==
499 rli->gaq->get_job_group(rli->gaq->entry)->sequence_number);
500
501 last_lwm_index = rli->gaq->find_lwm(
502 &ptr_g,
503 /*
504 The underfined "stable" forces the scan's restart
505 as the stale value does.
506 */
507 lwm_estim == SEQ_UNINIT ||
508 (is_stale = clock_leq(last_lwm_timestamp, lwm_estim))
509 ? rli->gaq->entry
510 : last_lwm_index);
511 /*
512 if the returned index is sane update the timestamp.
513 */
514 if (last_lwm_index != rli->gaq->size) {
515 // non-decreasing lwm invariant
516 DBUG_ASSERT(clock_leq(last_lwm_timestamp, ptr_g->sequence_number));
517
518 last_lwm_timestamp = ptr_g->sequence_number;
519 } else if (is_stale) {
520 last_lwm_timestamp.store(lwm_estim);
521 }
522
523 if (!need_lock) mysql_mutex_unlock(&rli->mts_gaq_LOCK);
524
525 return last_lwm_timestamp;
526 }
527
528 /**
529 The method implements logical timestamp conflict detection
530 and resolution through waiting by the calling thread.
531 The conflict or waiting condition is like the following
532
533 lwm < last_committed,
534
535 where lwm is a minimum logical timestamp of committed transactions.
536 Since the lwm's exact value is not always available its pessimistic
537 estimate (an old version) is improved (get_lwm_timestamp()) as the
538 first step before to the actual waiting commitment.
539
540 Special cases include:
541
542 When @c last_committed_arg is uninitialized the calling thread must
543 proceed without waiting for anyone. Any possible dependency with unknown
544 commit parent transaction shall be sorted out by the parent;
545
546 When the gaq index is subsequent to the last lwm index
547 there's no dependency of the current transaction with any regardless of
548 lwm timestamp should it be SEQ_UNINIT.
549 Consequently when GAQ consists of just one item there's none to wait.
550 Such latter case is left to the caller to handle.
551
552 @note The caller must make sure the current transaction won't be waiting
553 for itself. That is the method should not be caller by a Worker
554 whose group assignment is in the GAQ front item.
555
556 @param rli relay log info of coordinator
557 @param last_committed_arg logical timestamp of a parent transaction
558 @return false as success,
559 true when the error flag is raised or
560 the caller thread is found killed.
561 */
wait_for_last_committed_trx(Relay_log_info * rli,longlong last_committed_arg)562 bool Mts_submode_logical_clock::wait_for_last_committed_trx(
563 Relay_log_info *rli, longlong last_committed_arg) {
564 THD *thd = rli->info_thd;
565
566 DBUG_TRACE;
567
568 if (last_committed_arg == SEQ_UNINIT) return false;
569
570 mysql_mutex_lock(&rli->mts_gaq_LOCK);
571
572 DBUG_ASSERT(min_waited_timestamp == SEQ_UNINIT);
573
574 min_waited_timestamp.store(last_committed_arg);
575 /*
576 This transaction is a candidate for insertion into the waiting list.
577 That fact is descibed by incrementing waited_timestamp_cnt.
578 When the candidate won't make it the counter is decremented at once
579 while the mutex is hold.
580 */
581 if ((!rli->info_thd->killed && !is_error) &&
582 !clock_leq(last_committed_arg, get_lwm_timestamp(rli, true))) {
583 PSI_stage_info old_stage;
584 struct timespec ts[2];
585 set_timespec_nsec(&ts[0], 0);
586
587 DBUG_ASSERT(rli->gaq->len >= 2); // there's someone to wait
588
589 thd->ENTER_COND(&rli->logical_clock_cond, &rli->mts_gaq_LOCK,
590 &stage_worker_waiting_for_commit_parent, &old_stage);
591 do {
592 mysql_cond_wait(&rli->logical_clock_cond, &rli->mts_gaq_LOCK);
593 } while ((!rli->info_thd->killed && !is_error) &&
594 !clock_leq(last_committed_arg, estimate_lwm_timestamp()));
595 min_waited_timestamp.store(SEQ_UNINIT); // reset waiting flag
596 mysql_mutex_unlock(&rli->mts_gaq_LOCK);
597 thd->EXIT_COND(&old_stage);
598 set_timespec_nsec(&ts[1], 0);
599 rli->mts_total_wait_overlap += diff_timespec(&ts[1], &ts[0]);
600 } else {
601 min_waited_timestamp.store(SEQ_UNINIT);
602 mysql_mutex_unlock(&rli->mts_gaq_LOCK);
603 }
604
605 return rli->info_thd->killed || is_error;
606 }
607
608 /**
609 Does necessary arrangement before scheduling next event.
610 The method computes the meta-group status of the being scheduled
611 transaction represented by the event argument. When the status
612 is found OUT (of the current meta-group) as encoded as is_new_group == true
613 the global Scheduler (Coordinator thread) requests full synchronization
614 with all Workers.
615 The current being assigned group descriptor gets associated with
616 the group's logical timestamp aka sequence_number.
617
618 @return ER_MTS_CANT_PARALLEL, ER_MTS_INCONSISTENT_DATA
619 0 if no error or slave has been killed gracefully
620 */
schedule_next_event(Relay_log_info * rli,Log_event * ev)621 int Mts_submode_logical_clock::schedule_next_event(Relay_log_info *rli,
622 Log_event *ev) {
623 longlong last_sequence_number = sequence_number;
624 bool gap_successor = false;
625
626 DBUG_TRACE;
627 // We should check if the SQL thread was already killed before we schedule
628 // the next transaction
629 if (sql_slave_killed(rli->info_thd, rli)) return 0;
630
631 Slave_job_group *ptr_group =
632 rli->gaq->get_job_group(rli->gaq->assigned_group_index);
633 /*
634 A group id updater must satisfy the following:
635 - A query log event ("BEGIN" ) or a GTID EVENT
636 - A DDL or an implicit DML commit.
637 */
638 switch (ev->get_type_code()) {
639 case binary_log::GTID_LOG_EVENT:
640 case binary_log::ANONYMOUS_GTID_LOG_EVENT:
641 // TODO: control continuity
642 ptr_group->sequence_number = sequence_number =
643 static_cast<Gtid_log_event *>(ev)->sequence_number;
644 ptr_group->last_committed = last_committed =
645 static_cast<Gtid_log_event *>(ev)->last_committed;
646 break;
647
648 default:
649
650 sequence_number = last_committed = SEQ_UNINIT;
651
652 break;
653 }
654
655 DBUG_PRINT("info", ("sequence_number %lld, last_committed %lld",
656 sequence_number, last_committed));
657
658 if (first_event) {
659 first_event = false;
660 } else {
661 if (unlikely(clock_leq(sequence_number, last_committed) &&
662 last_committed != SEQ_UNINIT)) {
663 /* inconsistent (buggy) timestamps */
664 LogErr(ERROR_LEVEL, ER_RPL_INCONSISTENT_TIMESTAMPS_IN_TRX,
665 sequence_number, last_committed);
666 return ER_MTS_CANT_PARALLEL;
667 }
668 if (unlikely(clock_leq(sequence_number, last_sequence_number) &&
669 sequence_number != SEQ_UNINIT)) {
670 /* inconsistent (buggy) timestamps */
671 LogErr(ERROR_LEVEL, ER_RPL_INCONSISTENT_SEQUENCE_NO_IN_TRX,
672 sequence_number, last_sequence_number);
673 return ER_MTS_CANT_PARALLEL;
674 }
675 /*
676 Being scheduled transaction sequence may have gaps, even in
677 relay log. In such case a transaction that succeeds a gap will
678 wait for all ealier that were scheduled to finish. It's marked
679 as gap successor now.
680 */
681 static_assert(SEQ_UNINIT == 0, "");
682 if (unlikely(sequence_number > last_sequence_number + 1)) {
683 /*
684 TODO: account autopositioning
685 DBUG_ASSERT(rli->replicate_same_server_id);
686 */
687 DBUG_PRINT("info", ("sequence_number gap found, "
688 "last_sequence_number %lld, sequence_number %lld",
689 last_sequence_number, sequence_number));
690 gap_successor = true;
691 }
692 }
693
694 /*
695 The new group flag is practically the same as the force flag
696 when up to indicate syncronization with Workers.
697 */
698 is_new_group =
699 (/* First event after a submode switch; */
700 first_event ||
701 /* Require a fresh group to be started; */
702 // todo: turn `force_new_group' into sequence_number == SEQ_UNINIT
703 // condition
704 force_new_group ||
705 /* Rewritten event without commit point timestamp (todo: find use case)
706 */
707 sequence_number == SEQ_UNINIT ||
708 /*
709 undefined parent (e.g the very first trans from the master),
710 or old master.
711 */
712 last_committed == SEQ_UNINIT ||
713 /*
714 When gap successor depends on a gap before it the scheduler has
715 to serialize this transaction execution with previously
716 scheduled ones. Below for simplicity it's assumed that such
717 gap-dependency is always the case.
718 */
719 gap_successor ||
720 /*
721 previous group did not have sequence number assigned.
722 It's execution must be finished until the current group
723 can be assigned.
724 Dependency of the current group on the previous
725 can't be tracked. So let's wait till the former is over.
726 */
727 last_sequence_number == SEQ_UNINIT);
728 /*
729 The coordinator waits till all transactions on which the current one
730 depends on are applied.
731 */
732 if (!is_new_group) {
733 longlong lwm_estimate = estimate_lwm_timestamp();
734
735 if (!clock_leq(last_committed, lwm_estimate) &&
736 rli->gaq->assigned_group_index != rli->gaq->entry) {
737 /*
738 "Unlikely" branch.
739
740 The following block improves possibly stale lwm and when the
741 waiting condition stays, recompute min_waited_timestamp and go
742 waiting.
743 At awakening set min_waited_timestamp to commit_parent in the
744 subsequent GAQ index (could be NIL).
745 */
746 if (wait_for_last_committed_trx(rli, last_committed)) {
747 /*
748 MTS was waiting for a dependent transaction to finish but either it
749 has failed or the applier was requested to stop. In any case, this
750 transaction wasn't started yet and should not warn about the
751 coordinator stopping in a middle of a transaction to avoid polluting
752 the server error log.
753 */
754 rli->reported_unsafe_warning = true;
755 return -1;
756 }
757 /*
758 Making the slave's max last committed (lwm) to satisfy this
759 transaction's scheduling condition.
760 */
761 if (gap_successor) last_lwm_timestamp = sequence_number - 1;
762 DBUG_ASSERT(!clock_leq(sequence_number, estimate_lwm_timestamp()));
763 }
764
765 delegated_jobs++;
766
767 DBUG_ASSERT(!force_new_group);
768 } else {
769 DBUG_ASSERT(delegated_jobs >= jobs_done);
770 DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == 1 + delegated_jobs));
771 DBUG_ASSERT(rli->mts_group_status == Relay_log_info::MTS_IN_GROUP);
772
773 /*
774 Under the new group fall the following use cases:
775 - events from an OLD (sequence_number unaware) master;
776 - malformed (missed BEGIN or GTID_NEXT) group incl. its
777 particular form of CREATE..SELECT..from..@user_var (or rand- and
778 int- var in place of @user- var).
779 The malformed group is handled exceptionally each event is executed
780 as a solitary group yet by the same (zero id) worker.
781 */
782 if (-1 == wait_for_workers_to_finish(rli)) return ER_MTS_INCONSISTENT_DATA;
783
784 rli->mts_group_status = Relay_log_info::MTS_IN_GROUP; // wait set it to NOT
785 DBUG_ASSERT(min_waited_timestamp == SEQ_UNINIT);
786 /*
787 the instant last lwm timestamp must reset when force flag is up.
788 */
789 rli->gaq->lwm.sequence_number = last_lwm_timestamp = SEQ_UNINIT;
790 delegated_jobs = 1;
791 jobs_done = 0;
792 force_new_group = false;
793 /*
794 Not sequenced event can be followed with a logically relating
795 e.g User var to be followed by CREATE table.
796 It's supported to be executed in one-by-one fashion.
797 Todo: remove with the event group parser worklog.
798 */
799 if (sequence_number == SEQ_UNINIT && last_committed == SEQ_UNINIT)
800 rli->last_assigned_worker = *rli->workers.begin();
801 }
802
803 #ifndef DBUG_OFF
804 mysql_mutex_lock(&rli->mts_gaq_LOCK);
805 DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == delegated_jobs));
806 mysql_mutex_unlock(&rli->mts_gaq_LOCK);
807 #endif
808 return 0;
809 }
810
811 /**
812 Logic to attach the temporary tables from the worker threads upon
813 event execution.
814 @param thd THD instance
815 @param rli Relay_log_info instance
816 @param ev Query_log_event that is being applied
817 */
attach_temp_tables(THD * thd,const Relay_log_info * rli,Query_log_event * ev)818 void Mts_submode_logical_clock::attach_temp_tables(THD *thd,
819 const Relay_log_info *rli,
820 Query_log_event *ev) {
821 bool shifted = false;
822 TABLE *table, *cur_table;
823 DBUG_TRACE;
824 if (!is_mts_worker(thd) || (ev->ends_group() || ev->starts_group())) return;
825 /* fetch coordinator's rli */
826 Relay_log_info *c_rli = static_cast<const Slave_worker *>(rli)->c_rli;
827 DBUG_ASSERT(!thd->temporary_tables);
828 mysql_mutex_lock(&c_rli->mts_temp_table_LOCK);
829 if (!(table = c_rli->info_thd->temporary_tables)) {
830 mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
831 return;
832 }
833 c_rli->info_thd->temporary_tables = nullptr;
834 do {
835 /* store the current table */
836 cur_table = table;
837 /* move the table pointer to next in list, so that we can isolate the
838 current table */
839 table = table->next;
840 std::pair<uint, my_thread_id> st_id_pair =
841 get_server_and_thread_id(cur_table);
842 if (thd->server_id == st_id_pair.first &&
843 thd->variables.pseudo_thread_id == st_id_pair.second) {
844 /* short the list singling out the current table */
845 if (cur_table->prev) // not the first node
846 cur_table->prev->next = cur_table->next;
847 if (cur_table->next) // not the last node
848 cur_table->next->prev = cur_table->prev;
849 /* isolate the table */
850 cur_table->prev = nullptr;
851 cur_table->next = nullptr;
852 mts_move_temp_tables_to_thd(thd, cur_table);
853 } else
854 /* We must shift the C->temp_table pointer to the fist table unused in
855 this iteration. If all the tables have ben used C->temp_tables will
856 point to NULL */
857 if (!shifted) {
858 c_rli->info_thd->temporary_tables = cur_table;
859 shifted = true;
860 }
861 } while (table);
862 mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
863 }
864
865 /**
866 Logic to detach the temporary tables from the worker threads upon
867 event execution.
868 @param thd THD instance
869 @param rli Relay_log_info instance
870 */
detach_temp_tables(THD * thd,const Relay_log_info * rli,Query_log_event *)871 void Mts_submode_logical_clock::detach_temp_tables(THD *thd,
872 const Relay_log_info *rli,
873 Query_log_event *) {
874 DBUG_TRACE;
875 if (!is_mts_worker(thd)) return;
876 /*
877 Here in detach section we will move the tables from the worker to the
878 coordinaor thread. Since coordinator is shared we need to make sure that
879 there are no race conditions which may lead to assert failures and
880 non-deterministic results.
881 */
882 Relay_log_info *c_rli = static_cast<const Slave_worker *>(rli)->c_rli;
883 mysql_mutex_lock(&c_rli->mts_temp_table_LOCK);
884 mts_move_temp_tables_to_thd(c_rli->info_thd, thd->temporary_tables);
885 mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
886 thd->temporary_tables = nullptr;
887 }
888
889 /**
890 Logic to get least occupied worker when the sql mts_submode= master_parallel
891 @param rli relay log info of coordinator
892 @param ws array of worker threads
893 @param ev event for which we are searching for a worker.
894 @return slave worker thread or NULL when coordinator is killed by any worker.
895 */
896
get_least_occupied_worker(Relay_log_info * rli,Slave_worker_array * ws MY_ATTRIBUTE ((unused)),Log_event * ev)897 Slave_worker *Mts_submode_logical_clock::get_least_occupied_worker(
898 Relay_log_info *rli, Slave_worker_array *ws MY_ATTRIBUTE((unused)),
899 Log_event *ev) {
900 Slave_worker *worker = nullptr;
901 PSI_stage_info *old_stage = nullptr;
902 THD *thd = rli->info_thd;
903 DBUG_TRACE;
904 #ifndef DBUG_OFF
905
906 if (DBUG_EVALUATE_IF("mts_distribute_round_robin", 1, 0)) {
907 worker = ws->at(w_rr % ws->size());
908 LogErr(INFORMATION_LEVEL, ER_RPL_WORKER_ID_IS, worker->id,
909 static_cast<ulong>(w_rr % ws->size()));
910 DBUG_ASSERT(worker != nullptr);
911 return worker;
912 }
913 Slave_committed_queue *gaq = rli->gaq;
914 Slave_job_group *ptr_group;
915 ptr_group = gaq->get_job_group(rli->gaq->assigned_group_index);
916 #endif
917 /*
918 The scheduling works as follows, in this sequence
919 -If this is an internal event of a transaction use the last assigned
920 worker
921 -If the i-th transaction is being scheduled in this group where "i" <=
922 number of available workers then schedule the events to the consecutive
923 workers
924 -If the i-th transaction is being scheduled in this group where "i" >
925 number of available workers then schedule this to the forst worker that
926 becomes free.
927 */
928 if (rli->last_assigned_worker) {
929 worker = rli->last_assigned_worker;
930 DBUG_ASSERT(ev->get_type_code() != binary_log::USER_VAR_EVENT ||
931 worker->id == 0 || rli->curr_group_seen_begin ||
932 rli->curr_group_seen_gtid);
933 } else {
934 worker = get_free_worker(rli);
935
936 DBUG_ASSERT(ev->get_type_code() != binary_log::USER_VAR_EVENT ||
937 rli->curr_group_seen_begin || rli->curr_group_seen_gtid);
938
939 if (worker == nullptr) {
940 struct timespec ts[2];
941
942 set_timespec_nsec(&ts[0], 0);
943 // Update thd info as waiting for workers to finish.
944 thd->enter_stage(&stage_slave_waiting_for_workers_to_process_queue,
945 old_stage, __func__, __FILE__, __LINE__);
946 while (!worker && !thd->killed) {
947 /*
948 Busy wait with yielding thread control before to next attempt
949 to find a free worker. As of current, a worker
950 can't have more than one assigned group of events in its
951 queue.
952
953 todo: replace this At-Most-One assignment policy with
954 First Available Worker as
955 this method clearly can't be considered as optimal.
956 */
957 #if !defined(_WIN32)
958 sched_yield();
959 #else
960 my_sleep(rli->mts_coordinator_basic_nap);
961 #endif
962 worker = get_free_worker(rli);
963 }
964 THD_STAGE_INFO(thd, *old_stage);
965 set_timespec_nsec(&ts[1], 0);
966 rli->mts_total_wait_worker_avail += diff_timespec(&ts[1], &ts[0]);
967 rli->mts_wq_no_underrun_cnt++;
968 /*
969 Even OPTION_BEGIN is set, the 'BEGIN' event is not dispatched to
970 any worker thread. So The flag is removed and Coordinator thread
971 will not try to finish the group before abort.
972 */
973 if (worker == nullptr)
974 rli->info_thd->variables.option_bits &= ~OPTION_BEGIN;
975 }
976 if (rli->get_commit_order_manager() != nullptr && worker != nullptr)
977 rli->get_commit_order_manager()->register_trx(worker);
978 }
979
980 DBUG_ASSERT(ptr_group);
981 // assert that we have a worker thread for this event or the slave has
982 // stopped.
983 DBUG_ASSERT(worker != nullptr || thd->killed);
984 /* The master my have send db partition info. make sure we never use them*/
985 if (ev->get_type_code() == binary_log::QUERY_EVENT)
986 static_cast<Query_log_event *>(ev)->mts_accessed_dbs = 0;
987
988 return worker;
989 }
990
991 /**
992 Protected method to fetch a worker having no events assigned.
993 The method is supposed to be called by Coordinator, therefore
994 comparison like w_i->jobs.len == 0 must (eventually) succeed.
995
996 todo: consider to optimize scan that is getting more expensive with
997 more # of Workers.
998
999 @return a pointer to Worker or NULL if none is free.
1000 */
get_free_worker(Relay_log_info * rli)1001 Slave_worker *Mts_submode_logical_clock::get_free_worker(Relay_log_info *rli) {
1002 for (Slave_worker **it = rli->workers.begin(); it != rli->workers.end();
1003 ++it) {
1004 Slave_worker *w_i = *it;
1005 if (w_i->jobs.len == 0) return w_i;
1006 }
1007 return nullptr;
1008 }
1009
1010 /**
1011 Waits for slave workers to finish off the pending tasks before returning.
1012 Used in this submode to make sure that all assigned jobs have been done.
1013
1014 @param rli coordinator rli.
1015 @param ignore worker to ignore.
1016 @return -1 for error.
1017 0 no error.
1018 */
wait_for_workers_to_finish(Relay_log_info * rli,MY_ATTRIBUTE ((unused))Slave_worker * ignore)1019 int Mts_submode_logical_clock::wait_for_workers_to_finish(
1020 Relay_log_info *rli, MY_ATTRIBUTE((unused)) Slave_worker *ignore) {
1021 PSI_stage_info *old_stage = nullptr;
1022 THD *thd = rli->info_thd;
1023 DBUG_TRACE;
1024 DBUG_PRINT("info", ("delegated %d, jobs_done %d", delegated_jobs, jobs_done));
1025 // Update thd info as waiting for workers to finish.
1026 thd->enter_stage(&stage_slave_waiting_for_workers_to_process_queue, old_stage,
1027 __func__, __FILE__, __LINE__);
1028 while (delegated_jobs > jobs_done && !thd->killed && !is_error) {
1029 // Todo: consider to replace with a. GAQ::get_lwm_timestamp() or
1030 // b. (better) pthread wait+signal similarly to DB type.
1031 if (mts_checkpoint_routine(rli, true)) return -1;
1032 }
1033
1034 // Check if there is a failure on a not-ignored Worker
1035 for (Slave_worker **it = rli->workers.begin(); it != rli->workers.end();
1036 ++it) {
1037 Slave_worker *w_i = *it;
1038 if (w_i->running_status != Slave_worker::RUNNING) return -1;
1039 }
1040
1041 DBUG_EXECUTE_IF("wait_for_workers_to_finish_after_wait", {
1042 const char act[] = "now WAIT_FOR coordinator_continue";
1043 DBUG_ASSERT(!debug_sync_set_action(rli->info_thd, STRING_WITH_LEN(act)));
1044 });
1045
1046 // The current commit point sequence may end here (e.g Rotate to new log)
1047 rli->gaq->lwm.sequence_number = SEQ_UNINIT;
1048 // Restore previous info.
1049 THD_STAGE_INFO(thd, *old_stage);
1050 DBUG_PRINT("info", ("delegated %d, jobs_done %d, Workers have finished their"
1051 " jobs",
1052 delegated_jobs, jobs_done));
1053 rli->mts_group_status = Relay_log_info::MTS_NOT_IN_GROUP;
1054 return !thd->killed && !is_error ? 0 : -1;
1055 }
1056
1057 /**
1058 Protected method to fetch the server_id and pseudo_thread_id from a
1059 temporary table
1060 @param table instance pointer of TABLE structure.
1061 @return std:pair<uint, my_thread_id>
1062 @note It is the caller's responsibility to make sure we call this
1063 function only for temp tables.
1064 */
1065 std::pair<uint, my_thread_id>
get_server_and_thread_id(TABLE * table)1066 Mts_submode_logical_clock::get_server_and_thread_id(TABLE *table) {
1067 DBUG_TRACE;
1068 const char *extra_string = table->s->table_cache_key.str;
1069 size_t extra_string_len = table->s->table_cache_key.length;
1070 // assert will fail when called with non temporary tables.
1071 DBUG_ASSERT(table->s->table_cache_key.length > 0);
1072 std::pair<uint, my_thread_id> ret_pair = std::make_pair(
1073 /* last 8 bytes contains the server_id + pseudo_thread_id */
1074 // fetch first 4 bytes to get the server id.
1075 uint4korr(extra_string + extra_string_len - 8),
1076 /* next 4 bytes contains the pseudo_thread_id */
1077 uint4korr(extra_string + extra_string_len - 4));
1078 return ret_pair;
1079 }
1080