1 /* Copyright (c) 2013, 2020, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include "sql/rpl_mts_submode.h"
24 
25 #include <limits.h>
26 #include <string.h>
27 #include <time.h>
28 #include <memory>
29 
30 #include "lex_string.h"
31 #include "libbinlogevents/include/compression/iterator.h"
32 #include "m_string.h"
33 #include "my_byteorder.h"
34 #include "my_compiler.h"
35 #include "my_dbug.h"
36 #include "my_inttypes.h"
37 #include "my_loglevel.h"
38 #include "my_systime.h"
39 #include "my_thread.h"
40 #include "mysql/components/services/log_builtins.h"
41 #include "mysql/components/services/psi_stage_bits.h"
42 #include "mysql/psi/mysql_cond.h"
43 #include "mysql/psi/mysql_mutex.h"
44 #include "mysqld_error.h"
45 #include "sql/binlog_reader.h"
46 #include "sql/debug_sync.h"
47 #include "sql/log.h"
48 #include "sql/log_event.h"  // Query_log_event
49 #include "sql/mdl.h"
50 #include "sql/mysqld.h"  // stage_worker_....
51 #include "sql/query_options.h"
52 #include "sql/rpl_filter.h"
53 #include "sql/rpl_rli.h"      // Relay_log_info
54 #include "sql/rpl_rli_pdb.h"  // db_worker_hash_entry
55 #include "sql/rpl_slave.h"
56 #include "sql/rpl_slave_commit_order_manager.h"  // Commit_order_manager
57 #include "sql/sql_class.h"                       // THD
58 #include "sql/system_variables.h"
59 #include "sql/table.h"
60 
61 /**
62  Does necessary arrangement before scheduling next event.
63  @return 1  if  error
64           0 no error
65 */
schedule_next_event(Relay_log_info *,Log_event *)66 int Mts_submode_database::schedule_next_event(Relay_log_info *, Log_event *) {
67   /*nothing to do here*/
68   return 0;
69 }
70 
71 /**
72   Logic to attach temporary tables.
73 */
attach_temp_tables(THD * thd,const Relay_log_info *,Query_log_event * ev)74 void Mts_submode_database::attach_temp_tables(THD *thd, const Relay_log_info *,
75                                               Query_log_event *ev) {
76   int i, parts;
77   DBUG_TRACE;
78   if (!is_mts_worker(thd) || (ev->ends_group() || ev->starts_group())) return;
79   DBUG_ASSERT(!thd->temporary_tables);
80   // in over max-db:s case just one special partition is locked
81   parts = ((ev->mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
82                ? 1
83                : ev->mts_accessed_dbs);
84   for (i = 0; i < parts; i++) {
85     mts_move_temp_tables_to_thd(
86         thd, ev->mts_assigned_partitions[i]->temporary_tables);
87     ev->mts_assigned_partitions[i]->temporary_tables = nullptr;
88   }
89 }
90 
91 /**
92    Function is called by Coordinator when it identified an event
93    requiring sequential execution.
94    Creating sequential context for the event includes waiting
95    for the assigned to Workers tasks to be completed and their
96    resources such as temporary tables be returned to Coordinator's
97    repository.
98    In case all workers are waited Coordinator changes its group status.
99 
100    @param  rli     Relay_log_info instance of Coordinator
101    @param  ignore  Optional Worker instance pointer if the sequential context
102                    is established due for the ignore Worker. Its resources
103                    are to be retained.
104 
105    @note   Resources that are not occupied by Workers such as
106            a list of temporary tables held in unused (zero-usage) records
107            of APH are relocated to the Coordinator placeholder.
108 
109    @return non-negative number of released by Workers partitions
110            (one partition by one Worker can count multiple times)
111 
112            or -1 to indicate there has been a failure on a not-ignored Worker
113            as indicated by its running_status so synchronization can't succeed.
114 */
115 
wait_for_workers_to_finish(Relay_log_info * rli,Slave_worker * ignore)116 int Mts_submode_database::wait_for_workers_to_finish(Relay_log_info *rli,
117                                                      Slave_worker *ignore) {
118   uint ret = 0;
119   THD *thd = rli->info_thd;
120   bool cant_sync = false;
121   char llbuf[22];
122 
123   DBUG_TRACE;
124 
125   llstr(rli->get_event_relay_log_pos(), llbuf);
126   DBUG_PRINT("info", ("Coordinator and workers enter synchronization "
127                       "procedure when scheduling event relay-log: %s "
128                       "pos: %s",
129                       rli->get_event_relay_log_name(), llbuf));
130 
131   mysql_mutex_lock(&rli->slave_worker_hash_lock);
132 
133   for (const auto &key_and_value : rli->mapping_db_to_worker) {
134     db_worker_hash_entry *entry = key_and_value.second.get();
135     DBUG_ASSERT(entry);
136 
137     // the ignore Worker retains its active resources
138     if (ignore && entry->worker == ignore && entry->usage > 0) {
139       continue;
140     }
141 
142     if (entry->usage > 0 && !thd->killed) {
143       PSI_stage_info old_stage;
144       Slave_worker *w_entry = entry->worker;
145 
146       entry->worker = nullptr;  // mark Worker to signal when  usage drops to 0
147       thd->ENTER_COND(
148           &rli->slave_worker_hash_cond, &rli->slave_worker_hash_lock,
149           &stage_slave_waiting_worker_to_release_partition, &old_stage);
150       do {
151         mysql_cond_wait(&rli->slave_worker_hash_cond,
152                         &rli->slave_worker_hash_lock);
153         DBUG_PRINT("info", ("Either got awakened of notified: "
154                             "entry %p, usage %lu, worker %lu",
155                             entry, entry->usage, w_entry->id));
156       } while (entry->usage != 0 && !thd->killed);
157       entry->worker =
158           w_entry;  // restoring last association, needed only for assert
159       mysql_mutex_unlock(&rli->slave_worker_hash_lock);
160       thd->EXIT_COND(&old_stage);
161       ret++;
162     } else {
163       mysql_mutex_unlock(&rli->slave_worker_hash_lock);
164     }
165     // resources relocation
166     mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
167     entry->temporary_tables = nullptr;
168     if (entry->worker->running_status != Slave_worker::RUNNING)
169       cant_sync = true;
170     mysql_mutex_lock(&rli->slave_worker_hash_lock);
171   }
172 
173   mysql_mutex_unlock(&rli->slave_worker_hash_lock);
174 
175   if (!ignore) {
176     DBUG_PRINT("info", ("Coordinator synchronized with workers, "
177                         "waited entries: %d, cant_sync: %d",
178                         ret, cant_sync));
179 
180     rli->mts_group_status = Relay_log_info::MTS_NOT_IN_GROUP;
181   }
182 
183   return !cant_sync ? ret : -1;
184 }
185 
unfold_transaction_payload_event(Format_description_event & fde,Transaction_payload_log_event & tple,std::vector<Log_event * > & events)186 bool Mts_submode_database::unfold_transaction_payload_event(
187     Format_description_event &fde, Transaction_payload_log_event &tple,
188     std::vector<Log_event *> &events) {
189   bool error = false;
190   /*
191     disable checksums - there are no checksums for events inside the tple
192     otherwise, the last 4 bytes would be truncated.
193 
194     We do this by copying the fdle from the rli. Then we disable the checksum
195     in the copy. Then we use it to decode the events in the payload instead
196     of the original fdle.
197 
198     We allocate the fdle copy in the stack.
199 
200     TODO: simplify this by breaking the binlog_event_deserialize API
201     and make it take a single boolean instead that states whether the
202     event has a checksum in it or not.
203   */
204   Format_description_log_event fdle(fde.reader().buffer(), &fde);
205   fdle.footer()->checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF;
206   fdle.register_temp_buf(const_cast<char *>(fde.reader().buffer()), false);
207 
208   // unpack the event
209   binary_log::transaction::compression::Iterable_buffer it(
210       tple.get_payload(), tple.get_payload_size(), tple.get_uncompressed_size(),
211       tple.get_compression_type());
212 
213   for (auto ptr : it) {
214     Log_event *next = nullptr;
215     size_t event_len = uint4korr(ptr + EVENT_LEN_OFFSET);
216     if (binlog_event_deserialize(reinterpret_cast<const unsigned char *>(ptr),
217                                  event_len, &fdle, true, &next)) {
218       error = true; /* purecov: inspected */
219       break;        /* purecov: inspected */
220     } else {
221       DBUG_ASSERT(next != nullptr);
222       events.push_back(next);
223     }
224   }
225 
226   return error;
227 }
228 
set_multi_threaded_applier_context(const Relay_log_info & rli,Log_event & ev)229 bool Mts_submode_database::set_multi_threaded_applier_context(
230     const Relay_log_info &rli, Log_event &ev) {
231   bool error = false;
232 
233   // if this is a transaction payload event, we need to set the proper
234   // databases that its internal events update
235   if (ev.get_type_code() == binary_log::TRANSACTION_PAYLOAD_EVENT) {
236     Mts_db_names toset;
237     bool max_mts_dbs_in_event = false;
238     std::set<std::string> dbs;
239     auto &tple = static_cast<Transaction_payload_log_event &>(ev);
240     std::vector<Log_event *> events;
241     unfold_transaction_payload_event(*rli.get_rli_description_event(), tple,
242                                      events);
243 
244     for (auto inner : events) {
245       Mts_db_names mts_dbs;
246 
247       // This transaction payload event is already marked to run in
248       // isolation or the event being handled does not contain partition
249       // information
250       if (max_mts_dbs_in_event || !inner->contains_partition_info(true)) {
251         delete inner;
252         continue;
253       }
254 
255       // The following queries should run in isolation, thence setting
256       // OVER_MAX_DBS_IN_EVENT_MTS
257       if ((inner->get_type_code() == binary_log::QUERY_EVENT)) {
258         auto qev = static_cast<Query_log_event *>(inner);
259         if (qev->is_query_prefix_match(STRING_WITH_LEN("XA COMMIT")) ||
260             qev->is_query_prefix_match(STRING_WITH_LEN("XA ROLLBACK"))) {
261           max_mts_dbs_in_event = true;
262           delete inner;
263           continue;
264         }
265       }
266 
267       // OK, now that we have ruled the exceptions, lets handle the databases
268       // in the inner event.
269       inner->get_mts_dbs(&mts_dbs, rli.rpl_filter);
270 
271       // inner event has mark to run in isolation
272       if (mts_dbs.num == OVER_MAX_DBS_IN_EVENT_MTS) {
273         max_mts_dbs_in_event = true;
274         delete inner;
275         continue;
276       }
277 
278       // iterate over the databases and add them to the set
279       for (int i = 0; i < mts_dbs.num; i++) {
280         dbs.insert(mts_dbs.name[i]);
281         if (dbs.size() == MAX_DBS_IN_EVENT_MTS) {
282           max_mts_dbs_in_event = true;
283           break;
284         }
285       }
286 
287       // inner event not needed anymore. Delete.
288       delete inner;
289     }
290 
291     // now set the database information in the event
292     if (max_mts_dbs_in_event) {
293       toset.name[0] = "\0";
294       toset.num = OVER_MAX_DBS_IN_EVENT_MTS;
295     } else {
296       int i = 0;
297       // set the databases
298       for (auto &db : dbs) toset.name[i++] = db.c_str();
299 
300       // set the number of databases
301       toset.num = dbs.size();
302     }
303 
304     // save the mts_dbs to the payload event
305     tple.set_mts_dbs(toset);
306   }
307 
308   return error;
309 }
310 
311 /**
312  Logic to detach the temporary tables from the worker threads upon
313  event execution.
314  @param thd THD instance
315  @param rli Relay_log_info pointer
316  @param ev  Query_log_event that is being applied
317 */
detach_temp_tables(THD * thd,const Relay_log_info * rli,Query_log_event * ev)318 void Mts_submode_database::detach_temp_tables(THD *thd,
319                                               const Relay_log_info *rli,
320                                               Query_log_event *ev) {
321   DBUG_TRACE;
322   if (!is_mts_worker(thd)) return;
323   int parts = ((ev->mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
324                    ? 1
325                    : ev->mts_accessed_dbs);
326   /*
327     todo: optimize for a case of
328 
329     a. one db
330        Only detaching temporary_tables from thd to entry would require
331        instead of the double-loop below.
332 
333     b. unchanged thd->temporary_tables.
334        In such case the involved entries would continue to hold the
335        unmodified lists provided that the attach_ method does not
336        destroy references to them.
337   */
338   for (int i = 0; i < parts; i++) {
339     ev->mts_assigned_partitions[i]->temporary_tables = nullptr;
340   }
341 
342   Rpl_filter *rpl_filter = rli->rpl_filter;
343   for (TABLE *table = thd->temporary_tables; table;) {
344     int i;
345     const char *db_name = nullptr;
346 
347     // find which entry to go
348     for (i = 0; i < parts; i++) {
349       db_name = ev->mts_accessed_db_names[i];
350       if (!strlen(db_name)) break;
351       // Only default database is rewritten.
352       if (!rpl_filter->is_rewrite_empty() && !strcmp(ev->get_db(), db_name)) {
353         size_t dummy_len;
354         const char *db_filtered =
355             rpl_filter->get_rewrite_db(db_name, &dummy_len);
356         // db_name != db_filtered means that db_name is rewritten.
357         if (strcmp(db_name, db_filtered)) db_name = db_filtered;
358       }
359       if (strcmp(table->s->db.str, db_name) < 0)
360         continue;
361       else {
362         // When rewrite db rules are used we can not rely on
363         // mts_accessed_db_names elements order.
364         if (!rpl_filter->is_rewrite_empty() &&
365             strcmp(table->s->db.str, db_name))
366           continue;
367         else
368           break;
369       }
370     }
371     DBUG_ASSERT(db_name &&
372                 (!strcmp(table->s->db.str, db_name) || !strlen(db_name)));
373     DBUG_ASSERT(i < ev->mts_accessed_dbs);
374     // table pointer is shifted inside the function
375     table = mts_move_temp_table_to_entry(table, thd,
376                                          ev->mts_assigned_partitions[i]);
377   }
378 
379   DBUG_ASSERT(!thd->temporary_tables);
380 #ifndef DBUG_OFF
381   for (int i = 0; i < parts; i++) {
382     DBUG_ASSERT(!ev->mts_assigned_partitions[i]->temporary_tables ||
383                 !ev->mts_assigned_partitions[i]->temporary_tables->prev);
384   }
385 #endif
386 }
387 
388 /**
389   Logic to get least occupied worker when the sql mts_submode= database
390   @param ws  array of worker threads
391   @return slave worker thread
392  */
get_least_occupied_worker(Relay_log_info *,Slave_worker_array * ws,Log_event *)393 Slave_worker *Mts_submode_database::get_least_occupied_worker(
394     Relay_log_info *, Slave_worker_array *ws, Log_event *) {
395   long usage = LONG_MAX;
396   Slave_worker **ptr_current_worker = nullptr, *worker = nullptr;
397 
398   DBUG_TRACE;
399 
400 #ifndef DBUG_OFF
401 
402   if (DBUG_EVALUATE_IF("mts_distribute_round_robin", 1, 0)) {
403     worker = ws->at(w_rr % ws->size());
404     LogErr(INFORMATION_LEVEL, ER_RPL_WORKER_ID_IS, worker->id,
405            static_cast<ulong>(w_rr % ws->size()));
406     DBUG_ASSERT(worker != nullptr);
407     return worker;
408   }
409 #endif
410 
411   for (Slave_worker **it = ws->begin(); it != ws->end(); ++it) {
412     ptr_current_worker = it;
413     if ((*ptr_current_worker)->usage_partition <= usage) {
414       worker = *ptr_current_worker;
415       usage = (*ptr_current_worker)->usage_partition;
416     }
417   }
418   DBUG_ASSERT(worker != nullptr);
419   return worker;
420 }
421 
422 /* MTS submode master Default constructor */
Mts_submode_logical_clock()423 Mts_submode_logical_clock::Mts_submode_logical_clock() {
424   type = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
425   first_event = true;
426   force_new_group = false;
427   is_new_group = true;
428   delegated_jobs = 0;
429   jobs_done = 0;
430   last_lwm_timestamp = SEQ_UNINIT;
431   last_lwm_index = INDEX_UNDEF;
432   is_error = false;
433   min_waited_timestamp = SEQ_UNINIT;
434   last_committed = SEQ_UNINIT;
435   sequence_number = SEQ_UNINIT;
436 }
437 
438 /**
439    The method finds the minimum logical timestamp (low-water-mark) of
440    committed transactions.
441    The successful search results in a pair of a logical timestamp value and a
442    GAQ index that contains it. last_lwm_timestamp may still be raised though the
443    search does not find any satisfying running index. Search is implemented as
444    headway scanning of GAQ from a point of a previous search's stop position
445    (last_lwm_index). Whether the cached (memorized) index value is considered to
446    be stale when its timestamp gets less than the current "stable" LWM:
447 
448         last_lwm_timestamp <= GAQ.lwm.sequence_number           (*)
449 
450    Staleness is caused by GAQ garbage collection that increments the rhs of (*),
451    see @c move_queue_head(). When that's diagnosed, the search in GAQ needs
452    restarting from the queue tail.
453 
454    Formally, the undefined cached value of last_lwm_timestamp is also stale.
455 
456    @verbatim
457               the last time index containg lwm
458                   +------+
459                   | LWM  |
460                   |  |   |
461                   V  V   V
462    GAQ:   xoooooxxxxxXXXXX...X
463                 ^   ^
464                 |   | LWM+1
465                 |
466                 +- tne new current_lwm
467 
468          <---- logical (commit) time ----
469    @endverbatim
470 
471    here `x' stands for committed, `X' for committed and discarded from
472    the running range of the queue, `o' for not committed.
473 
474    @param  rli         Relay_log_info pointer
475    @param  need_lock   Either the caller or the function must hold a mutex
476                        to avoid race with concurrent GAQ update.
477 
478    @return possibly updated current_lwm
479 */
get_lwm_timestamp(Relay_log_info * rli,bool need_lock)480 longlong Mts_submode_logical_clock::get_lwm_timestamp(Relay_log_info *rli,
481                                                       bool need_lock) {
482   longlong lwm_estim;
483   Slave_job_group *ptr_g = nullptr;
484   bool is_stale = false;
485 
486   if (!need_lock) mysql_mutex_lock(&rli->mts_gaq_LOCK);
487 
488   /*
489     Make the "stable" LWM-based estimate which will be compared
490     against the cached "instant" value.
491   */
492   lwm_estim = rli->gaq->lwm.sequence_number;
493   /*
494     timestamp continuity invariant: if the queue has any item
495     its timestamp is greater on one than the estimate.
496   */
497   DBUG_ASSERT(lwm_estim == SEQ_UNINIT || rli->gaq->empty() ||
498               lwm_estim + 1 ==
499                   rli->gaq->get_job_group(rli->gaq->entry)->sequence_number);
500 
501   last_lwm_index = rli->gaq->find_lwm(
502       &ptr_g,
503       /*
504         The underfined "stable" forces the scan's restart
505         as the stale value does.
506       */
507       lwm_estim == SEQ_UNINIT ||
508               (is_stale = clock_leq(last_lwm_timestamp, lwm_estim))
509           ? rli->gaq->entry
510           : last_lwm_index);
511   /*
512     if the returned index is sane update the timestamp.
513   */
514   if (last_lwm_index != rli->gaq->size) {
515     // non-decreasing lwm invariant
516     DBUG_ASSERT(clock_leq(last_lwm_timestamp, ptr_g->sequence_number));
517 
518     last_lwm_timestamp = ptr_g->sequence_number;
519   } else if (is_stale) {
520     last_lwm_timestamp.store(lwm_estim);
521   }
522 
523   if (!need_lock) mysql_mutex_unlock(&rli->mts_gaq_LOCK);
524 
525   return last_lwm_timestamp;
526 }
527 
528 /**
529    The method implements logical timestamp conflict detection
530    and resolution through waiting by the calling thread.
531    The conflict or waiting condition is like the following
532 
533            lwm < last_committed,
534 
535    where lwm is a minimum logical timestamp of committed transactions.
536    Since the lwm's exact value is not always available its pessimistic
537    estimate (an old version) is improved (get_lwm_timestamp()) as the
538    first step before to the actual waiting commitment.
539 
540    Special cases include:
541 
542    When @c last_committed_arg is uninitialized the calling thread must
543    proceed without waiting for anyone. Any possible dependency with unknown
544    commit parent transaction shall be sorted out by the parent;
545 
546    When the gaq index is subsequent to the last lwm index
547    there's no dependency of the current transaction with any regardless of
548    lwm timestamp should it be SEQ_UNINIT.
549    Consequently when GAQ consists of just one item there's none to wait.
550    Such latter case is left to the caller to handle.
551 
552    @note The caller must make sure the current transaction won't be waiting
553          for itself. That is the method should not be caller by a Worker
554          whose group assignment is in the GAQ front item.
555 
556    @param rli relay log info of coordinator
557    @param last_committed_arg  logical timestamp of a parent transaction
558    @return false as success,
559            true  when the error flag is raised or
560                  the caller thread is found killed.
561 */
wait_for_last_committed_trx(Relay_log_info * rli,longlong last_committed_arg)562 bool Mts_submode_logical_clock::wait_for_last_committed_trx(
563     Relay_log_info *rli, longlong last_committed_arg) {
564   THD *thd = rli->info_thd;
565 
566   DBUG_TRACE;
567 
568   if (last_committed_arg == SEQ_UNINIT) return false;
569 
570   mysql_mutex_lock(&rli->mts_gaq_LOCK);
571 
572   DBUG_ASSERT(min_waited_timestamp == SEQ_UNINIT);
573 
574   min_waited_timestamp.store(last_committed_arg);
575   /*
576     This transaction is a candidate for insertion into the waiting list.
577     That fact is descibed by incrementing waited_timestamp_cnt.
578     When the candidate won't make it the counter is decremented at once
579     while the mutex is hold.
580   */
581   if ((!rli->info_thd->killed && !is_error) &&
582       !clock_leq(last_committed_arg, get_lwm_timestamp(rli, true))) {
583     PSI_stage_info old_stage;
584     struct timespec ts[2];
585     set_timespec_nsec(&ts[0], 0);
586 
587     DBUG_ASSERT(rli->gaq->len >= 2);  // there's someone to wait
588 
589     thd->ENTER_COND(&rli->logical_clock_cond, &rli->mts_gaq_LOCK,
590                     &stage_worker_waiting_for_commit_parent, &old_stage);
591     do {
592       mysql_cond_wait(&rli->logical_clock_cond, &rli->mts_gaq_LOCK);
593     } while ((!rli->info_thd->killed && !is_error) &&
594              !clock_leq(last_committed_arg, estimate_lwm_timestamp()));
595     min_waited_timestamp.store(SEQ_UNINIT);  // reset waiting flag
596     mysql_mutex_unlock(&rli->mts_gaq_LOCK);
597     thd->EXIT_COND(&old_stage);
598     set_timespec_nsec(&ts[1], 0);
599     rli->mts_total_wait_overlap += diff_timespec(&ts[1], &ts[0]);
600   } else {
601     min_waited_timestamp.store(SEQ_UNINIT);
602     mysql_mutex_unlock(&rli->mts_gaq_LOCK);
603   }
604 
605   return rli->info_thd->killed || is_error;
606 }
607 
608 /**
609  Does necessary arrangement before scheduling next event.
610  The method computes the meta-group status of the being scheduled
611  transaction represented by the event argument. When the status
612  is found OUT (of the current meta-group) as encoded as is_new_group == true
613  the global Scheduler (Coordinator thread) requests full synchronization
614  with all Workers.
615  The current being assigned group descriptor gets associated with
616  the group's logical timestamp aka sequence_number.
617 
618  @return ER_MTS_CANT_PARALLEL, ER_MTS_INCONSISTENT_DATA
619           0 if no error or slave has been killed gracefully
620  */
schedule_next_event(Relay_log_info * rli,Log_event * ev)621 int Mts_submode_logical_clock::schedule_next_event(Relay_log_info *rli,
622                                                    Log_event *ev) {
623   longlong last_sequence_number = sequence_number;
624   bool gap_successor = false;
625 
626   DBUG_TRACE;
627   // We should check if the SQL thread was already killed before we schedule
628   // the next transaction
629   if (sql_slave_killed(rli->info_thd, rli)) return 0;
630 
631   Slave_job_group *ptr_group =
632       rli->gaq->get_job_group(rli->gaq->assigned_group_index);
633   /*
634     A group id updater must satisfy the following:
635     - A query log event ("BEGIN" ) or a GTID EVENT
636     - A DDL or an implicit DML commit.
637   */
638   switch (ev->get_type_code()) {
639     case binary_log::GTID_LOG_EVENT:
640     case binary_log::ANONYMOUS_GTID_LOG_EVENT:
641       // TODO: control continuity
642       ptr_group->sequence_number = sequence_number =
643           static_cast<Gtid_log_event *>(ev)->sequence_number;
644       ptr_group->last_committed = last_committed =
645           static_cast<Gtid_log_event *>(ev)->last_committed;
646       break;
647 
648     default:
649 
650       sequence_number = last_committed = SEQ_UNINIT;
651 
652       break;
653   }
654 
655   DBUG_PRINT("info", ("sequence_number %lld, last_committed %lld",
656                       sequence_number, last_committed));
657 
658   if (first_event) {
659     first_event = false;
660   } else {
661     if (unlikely(clock_leq(sequence_number, last_committed) &&
662                  last_committed != SEQ_UNINIT)) {
663       /* inconsistent (buggy) timestamps */
664       LogErr(ERROR_LEVEL, ER_RPL_INCONSISTENT_TIMESTAMPS_IN_TRX,
665              sequence_number, last_committed);
666       return ER_MTS_CANT_PARALLEL;
667     }
668     if (unlikely(clock_leq(sequence_number, last_sequence_number) &&
669                  sequence_number != SEQ_UNINIT)) {
670       /* inconsistent (buggy) timestamps */
671       LogErr(ERROR_LEVEL, ER_RPL_INCONSISTENT_SEQUENCE_NO_IN_TRX,
672              sequence_number, last_sequence_number);
673       return ER_MTS_CANT_PARALLEL;
674     }
675     /*
676       Being scheduled transaction sequence may have gaps, even in
677       relay log. In such case a transaction that succeeds a gap will
678       wait for all ealier that were scheduled to finish. It's marked
679       as gap successor now.
680     */
681     static_assert(SEQ_UNINIT == 0, "");
682     if (unlikely(sequence_number > last_sequence_number + 1)) {
683       /*
684         TODO: account autopositioning
685         DBUG_ASSERT(rli->replicate_same_server_id);
686       */
687       DBUG_PRINT("info", ("sequence_number gap found, "
688                           "last_sequence_number %lld, sequence_number %lld",
689                           last_sequence_number, sequence_number));
690       gap_successor = true;
691     }
692   }
693 
694   /*
695     The new group flag is practically the same as the force flag
696     when up to indicate syncronization with Workers.
697   */
698   is_new_group =
699       (/* First event after a submode switch; */
700        first_event ||
701        /* Require a fresh group to be started; */
702        // todo: turn `force_new_group' into sequence_number == SEQ_UNINIT
703        // condition
704        force_new_group ||
705        /* Rewritten event without commit point timestamp (todo: find use case)
706         */
707        sequence_number == SEQ_UNINIT ||
708        /*
709          undefined parent (e.g the very first trans from the master),
710          or old master.
711        */
712        last_committed == SEQ_UNINIT ||
713        /*
714          When gap successor depends on a gap before it the scheduler has
715          to serialize this transaction execution with previously
716          scheduled ones. Below for simplicity it's assumed that such
717          gap-dependency is always the case.
718        */
719        gap_successor ||
720        /*
721          previous group did not have sequence number assigned.
722          It's execution must be finished until the current group
723          can be assigned.
724          Dependency of the current group on the previous
725          can't be tracked. So let's wait till the former is over.
726        */
727        last_sequence_number == SEQ_UNINIT);
728   /*
729     The coordinator waits till all transactions on which the current one
730     depends on are applied.
731   */
732   if (!is_new_group) {
733     longlong lwm_estimate = estimate_lwm_timestamp();
734 
735     if (!clock_leq(last_committed, lwm_estimate) &&
736         rli->gaq->assigned_group_index != rli->gaq->entry) {
737       /*
738         "Unlikely" branch.
739 
740         The following block improves possibly stale lwm and when the
741         waiting condition stays, recompute min_waited_timestamp and go
742         waiting.
743         At awakening set min_waited_timestamp to commit_parent in the
744         subsequent GAQ index (could be NIL).
745       */
746       if (wait_for_last_committed_trx(rli, last_committed)) {
747         /*
748           MTS was waiting for a dependent transaction to finish but either it
749           has failed or the applier was requested to stop. In any case, this
750           transaction wasn't started yet and should not warn about the
751           coordinator stopping in a middle of a transaction to avoid polluting
752           the server error log.
753         */
754         rli->reported_unsafe_warning = true;
755         return -1;
756       }
757       /*
758         Making the slave's max last committed (lwm) to satisfy this
759         transaction's scheduling condition.
760       */
761       if (gap_successor) last_lwm_timestamp = sequence_number - 1;
762       DBUG_ASSERT(!clock_leq(sequence_number, estimate_lwm_timestamp()));
763     }
764 
765     delegated_jobs++;
766 
767     DBUG_ASSERT(!force_new_group);
768   } else {
769     DBUG_ASSERT(delegated_jobs >= jobs_done);
770     DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == 1 + delegated_jobs));
771     DBUG_ASSERT(rli->mts_group_status == Relay_log_info::MTS_IN_GROUP);
772 
773     /*
774       Under the new group fall the following use cases:
775       - events from an OLD (sequence_number unaware) master;
776       - malformed (missed BEGIN or GTID_NEXT) group incl. its
777         particular form of CREATE..SELECT..from..@user_var (or rand- and
778         int- var in place of @user- var).
779         The malformed group is handled exceptionally each event is executed
780         as a solitary group yet by the same (zero id) worker.
781     */
782     if (-1 == wait_for_workers_to_finish(rli)) return ER_MTS_INCONSISTENT_DATA;
783 
784     rli->mts_group_status = Relay_log_info::MTS_IN_GROUP;  // wait set it to NOT
785     DBUG_ASSERT(min_waited_timestamp == SEQ_UNINIT);
786     /*
787       the instant last lwm timestamp must reset when force flag is up.
788     */
789     rli->gaq->lwm.sequence_number = last_lwm_timestamp = SEQ_UNINIT;
790     delegated_jobs = 1;
791     jobs_done = 0;
792     force_new_group = false;
793     /*
794       Not sequenced event can be followed with a logically relating
795       e.g User var to be followed by CREATE table.
796       It's supported to be executed in one-by-one fashion.
797       Todo: remove with the event group parser worklog.
798     */
799     if (sequence_number == SEQ_UNINIT && last_committed == SEQ_UNINIT)
800       rli->last_assigned_worker = *rli->workers.begin();
801   }
802 
803 #ifndef DBUG_OFF
804   mysql_mutex_lock(&rli->mts_gaq_LOCK);
805   DBUG_ASSERT(is_error || (rli->gaq->len + jobs_done == delegated_jobs));
806   mysql_mutex_unlock(&rli->mts_gaq_LOCK);
807 #endif
808   return 0;
809 }
810 
811 /**
812  Logic to attach the temporary tables from the worker threads upon
813  event execution.
814  @param thd THD instance
815  @param rli Relay_log_info instance
816  @param ev  Query_log_event that is being applied
817 */
attach_temp_tables(THD * thd,const Relay_log_info * rli,Query_log_event * ev)818 void Mts_submode_logical_clock::attach_temp_tables(THD *thd,
819                                                    const Relay_log_info *rli,
820                                                    Query_log_event *ev) {
821   bool shifted = false;
822   TABLE *table, *cur_table;
823   DBUG_TRACE;
824   if (!is_mts_worker(thd) || (ev->ends_group() || ev->starts_group())) return;
825   /* fetch coordinator's rli */
826   Relay_log_info *c_rli = static_cast<const Slave_worker *>(rli)->c_rli;
827   DBUG_ASSERT(!thd->temporary_tables);
828   mysql_mutex_lock(&c_rli->mts_temp_table_LOCK);
829   if (!(table = c_rli->info_thd->temporary_tables)) {
830     mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
831     return;
832   }
833   c_rli->info_thd->temporary_tables = nullptr;
834   do {
835     /* store the current table */
836     cur_table = table;
837     /* move the table pointer to next in list, so that we can isolate the
838     current table */
839     table = table->next;
840     std::pair<uint, my_thread_id> st_id_pair =
841         get_server_and_thread_id(cur_table);
842     if (thd->server_id == st_id_pair.first &&
843         thd->variables.pseudo_thread_id == st_id_pair.second) {
844       /* short the list singling out the current table */
845       if (cur_table->prev)  // not the first node
846         cur_table->prev->next = cur_table->next;
847       if (cur_table->next)  // not the last node
848         cur_table->next->prev = cur_table->prev;
849       /* isolate the table */
850       cur_table->prev = nullptr;
851       cur_table->next = nullptr;
852       mts_move_temp_tables_to_thd(thd, cur_table);
853     } else
854         /* We must shift the C->temp_table pointer to the fist table unused in
855            this iteration. If all the tables have ben used C->temp_tables will
856            point to NULL */
857         if (!shifted) {
858       c_rli->info_thd->temporary_tables = cur_table;
859       shifted = true;
860     }
861   } while (table);
862   mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
863 }
864 
865 /**
866  Logic to detach the temporary tables from the worker threads upon
867  event execution.
868  @param thd THD instance
869  @param rli Relay_log_info instance
870 */
detach_temp_tables(THD * thd,const Relay_log_info * rli,Query_log_event *)871 void Mts_submode_logical_clock::detach_temp_tables(THD *thd,
872                                                    const Relay_log_info *rli,
873                                                    Query_log_event *) {
874   DBUG_TRACE;
875   if (!is_mts_worker(thd)) return;
876   /*
877     Here in detach section we will move the tables from the worker to the
878     coordinaor thread. Since coordinator is shared we need to make sure that
879     there are no race conditions which may lead to assert failures and
880     non-deterministic results.
881   */
882   Relay_log_info *c_rli = static_cast<const Slave_worker *>(rli)->c_rli;
883   mysql_mutex_lock(&c_rli->mts_temp_table_LOCK);
884   mts_move_temp_tables_to_thd(c_rli->info_thd, thd->temporary_tables);
885   mysql_mutex_unlock(&c_rli->mts_temp_table_LOCK);
886   thd->temporary_tables = nullptr;
887 }
888 
889 /**
890   Logic to get least occupied worker when the sql mts_submode= master_parallel
891   @param rli relay log info of coordinator
892   @param ws  array of worker threads
893   @param ev  event for which we are searching for a worker.
894   @return slave worker thread or NULL when coordinator is killed by any worker.
895  */
896 
get_least_occupied_worker(Relay_log_info * rli,Slave_worker_array * ws MY_ATTRIBUTE ((unused)),Log_event * ev)897 Slave_worker *Mts_submode_logical_clock::get_least_occupied_worker(
898     Relay_log_info *rli, Slave_worker_array *ws MY_ATTRIBUTE((unused)),
899     Log_event *ev) {
900   Slave_worker *worker = nullptr;
901   PSI_stage_info *old_stage = nullptr;
902   THD *thd = rli->info_thd;
903   DBUG_TRACE;
904 #ifndef DBUG_OFF
905 
906   if (DBUG_EVALUATE_IF("mts_distribute_round_robin", 1, 0)) {
907     worker = ws->at(w_rr % ws->size());
908     LogErr(INFORMATION_LEVEL, ER_RPL_WORKER_ID_IS, worker->id,
909            static_cast<ulong>(w_rr % ws->size()));
910     DBUG_ASSERT(worker != nullptr);
911     return worker;
912   }
913   Slave_committed_queue *gaq = rli->gaq;
914   Slave_job_group *ptr_group;
915   ptr_group = gaq->get_job_group(rli->gaq->assigned_group_index);
916 #endif
917   /*
918     The scheduling works as follows, in this sequence
919       -If this is an internal event of a transaction  use the last assigned
920         worker
921       -If the i-th transaction is being scheduled in this group where "i" <=
922        number of available workers then schedule the events to the consecutive
923        workers
924       -If the i-th transaction is being scheduled in this group where "i" >
925        number of available workers then schedule this to the forst worker that
926        becomes free.
927    */
928   if (rli->last_assigned_worker) {
929     worker = rli->last_assigned_worker;
930     DBUG_ASSERT(ev->get_type_code() != binary_log::USER_VAR_EVENT ||
931                 worker->id == 0 || rli->curr_group_seen_begin ||
932                 rli->curr_group_seen_gtid);
933   } else {
934     worker = get_free_worker(rli);
935 
936     DBUG_ASSERT(ev->get_type_code() != binary_log::USER_VAR_EVENT ||
937                 rli->curr_group_seen_begin || rli->curr_group_seen_gtid);
938 
939     if (worker == nullptr) {
940       struct timespec ts[2];
941 
942       set_timespec_nsec(&ts[0], 0);
943       // Update thd info as waiting for workers to finish.
944       thd->enter_stage(&stage_slave_waiting_for_workers_to_process_queue,
945                        old_stage, __func__, __FILE__, __LINE__);
946       while (!worker && !thd->killed) {
947         /*
948           Busy wait with yielding thread control before to next attempt
949           to find a free worker. As of current, a worker
950           can't have more than one assigned group of events in its
951           queue.
952 
953           todo: replace this At-Most-One assignment policy with
954                 First Available Worker as
955                 this method clearly can't be considered as optimal.
956         */
957 #if !defined(_WIN32)
958         sched_yield();
959 #else
960         my_sleep(rli->mts_coordinator_basic_nap);
961 #endif
962         worker = get_free_worker(rli);
963       }
964       THD_STAGE_INFO(thd, *old_stage);
965       set_timespec_nsec(&ts[1], 0);
966       rli->mts_total_wait_worker_avail += diff_timespec(&ts[1], &ts[0]);
967       rli->mts_wq_no_underrun_cnt++;
968       /*
969         Even OPTION_BEGIN is set, the 'BEGIN' event is not dispatched to
970         any worker thread. So The flag is removed and Coordinator thread
971         will not try to finish the group before abort.
972       */
973       if (worker == nullptr)
974         rli->info_thd->variables.option_bits &= ~OPTION_BEGIN;
975     }
976     if (rli->get_commit_order_manager() != nullptr && worker != nullptr)
977       rli->get_commit_order_manager()->register_trx(worker);
978   }
979 
980   DBUG_ASSERT(ptr_group);
981   // assert that we have a worker thread for this event or the slave has
982   // stopped.
983   DBUG_ASSERT(worker != nullptr || thd->killed);
984   /* The master my have send  db partition info. make sure we never use them*/
985   if (ev->get_type_code() == binary_log::QUERY_EVENT)
986     static_cast<Query_log_event *>(ev)->mts_accessed_dbs = 0;
987 
988   return worker;
989 }
990 
991 /**
992   Protected method to fetch a worker having no events assigned.
993   The method is supposed to be called by Coordinator, therefore
994   comparison like w_i->jobs.len == 0 must (eventually) succeed.
995 
996   todo: consider to optimize scan that is getting more expensive with
997   more # of Workers.
998 
999   @return  a pointer to Worker or NULL if none is free.
1000 */
get_free_worker(Relay_log_info * rli)1001 Slave_worker *Mts_submode_logical_clock::get_free_worker(Relay_log_info *rli) {
1002   for (Slave_worker **it = rli->workers.begin(); it != rli->workers.end();
1003        ++it) {
1004     Slave_worker *w_i = *it;
1005     if (w_i->jobs.len == 0) return w_i;
1006   }
1007   return nullptr;
1008 }
1009 
1010 /**
1011   Waits for slave workers to finish off the pending tasks before returning.
1012   Used in this submode to make sure that all assigned jobs have been done.
1013 
1014   @param rli  coordinator rli.
1015   @param ignore worker to ignore.
1016   @return -1 for error.
1017            0 no error.
1018  */
wait_for_workers_to_finish(Relay_log_info * rli,MY_ATTRIBUTE ((unused))Slave_worker * ignore)1019 int Mts_submode_logical_clock::wait_for_workers_to_finish(
1020     Relay_log_info *rli, MY_ATTRIBUTE((unused)) Slave_worker *ignore) {
1021   PSI_stage_info *old_stage = nullptr;
1022   THD *thd = rli->info_thd;
1023   DBUG_TRACE;
1024   DBUG_PRINT("info", ("delegated %d, jobs_done %d", delegated_jobs, jobs_done));
1025   // Update thd info as waiting for workers to finish.
1026   thd->enter_stage(&stage_slave_waiting_for_workers_to_process_queue, old_stage,
1027                    __func__, __FILE__, __LINE__);
1028   while (delegated_jobs > jobs_done && !thd->killed && !is_error) {
1029     // Todo: consider to replace with a. GAQ::get_lwm_timestamp() or
1030     // b. (better) pthread wait+signal similarly to DB type.
1031     if (mts_checkpoint_routine(rli, true)) return -1;
1032   }
1033 
1034   // Check if there is a failure on a not-ignored Worker
1035   for (Slave_worker **it = rli->workers.begin(); it != rli->workers.end();
1036        ++it) {
1037     Slave_worker *w_i = *it;
1038     if (w_i->running_status != Slave_worker::RUNNING) return -1;
1039   }
1040 
1041   DBUG_EXECUTE_IF("wait_for_workers_to_finish_after_wait", {
1042     const char act[] = "now WAIT_FOR coordinator_continue";
1043     DBUG_ASSERT(!debug_sync_set_action(rli->info_thd, STRING_WITH_LEN(act)));
1044   });
1045 
1046   // The current commit point sequence may end here (e.g Rotate to new log)
1047   rli->gaq->lwm.sequence_number = SEQ_UNINIT;
1048   // Restore previous info.
1049   THD_STAGE_INFO(thd, *old_stage);
1050   DBUG_PRINT("info", ("delegated %d, jobs_done %d, Workers have finished their"
1051                       " jobs",
1052                       delegated_jobs, jobs_done));
1053   rli->mts_group_status = Relay_log_info::MTS_NOT_IN_GROUP;
1054   return !thd->killed && !is_error ? 0 : -1;
1055 }
1056 
1057 /**
1058   Protected method to fetch the server_id and pseudo_thread_id from a
1059   temporary table
1060   @param  table instance pointer of TABLE structure.
1061   @return std:pair<uint, my_thread_id>
1062   @note   It is the caller's responsibility to make sure we call this
1063           function only for temp tables.
1064  */
1065 std::pair<uint, my_thread_id>
get_server_and_thread_id(TABLE * table)1066 Mts_submode_logical_clock::get_server_and_thread_id(TABLE *table) {
1067   DBUG_TRACE;
1068   const char *extra_string = table->s->table_cache_key.str;
1069   size_t extra_string_len = table->s->table_cache_key.length;
1070   // assert will fail when called with non temporary tables.
1071   DBUG_ASSERT(table->s->table_cache_key.length > 0);
1072   std::pair<uint, my_thread_id> ret_pair = std::make_pair(
1073       /* last 8  bytes contains the server_id + pseudo_thread_id */
1074       // fetch first 4 bytes to get the server id.
1075       uint4korr(extra_string + extra_string_len - 8),
1076       /* next  4 bytes contains the pseudo_thread_id */
1077       uint4korr(extra_string + extra_string_len - 4));
1078   return ret_pair;
1079 }
1080