1 /* Copyright (c) 2013, 2017, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 #ifndef MTS_SUBMODE_H
23 #define MTS_SUBMODE_H
24 
25 #include "my_global.h"
26 #include "my_atomic.h"         // my_atomic_load64
27 #include "my_thread_local.h"   // my_thread_id
28 #include "prealloced_array.h"  // Prealloced_array
29 #include "binlog_event.h"      // SEQ_UNINIT
30 
31 class Log_event;
32 class Mts_submode_database;
33 class Mts_submode_logical_clock;
34 class Query_log_event;
35 class Relay_log_info;
36 class Slave_worker;
37 class THD;
38 struct TABLE;
39 typedef Prealloced_array<Slave_worker*, 4> Slave_worker_array;
40 
41 enum enum_mts_parallel_type {
42   /* Parallel slave based on Database name */
43   MTS_PARALLEL_TYPE_DB_NAME= 0,
44   /* Parallel slave based on group information from Binlog group commit */
45   MTS_PARALLEL_TYPE_LOGICAL_CLOCK= 1
46 };
47 
48 // Extend the following class as per requirement for each sub mode
49 class Mts_submode
50 {
51 private:
52 protected:
53 
54   /* Parallel type */
55   enum_mts_parallel_type  type;
56 public:
Mts_submode()57   Mts_submode(){}
get_type()58   inline enum_mts_parallel_type get_type(){return type;}
59   // pure virtual methods. Should be extended in the derieved class
60 
61   /* Logic to schedule the next event. called at the B event for each
62      transaction */
63   virtual int schedule_next_event(Relay_log_info* rli,
64                                    Log_event *ev)= 0;
65 
66   /* logic to attach temp tables Should be extended in the derieved class */
67   virtual void attach_temp_tables(THD *thd, const Relay_log_info* rli,
68                                   Query_log_event *ev)= 0;
69 
70   /* logic to detach temp tables. Should be extended in the derieved class  */
71   virtual void detach_temp_tables(THD *thd, const Relay_log_info* rli,
72                                   Query_log_event *ev)= 0;
73 
74   /* returns the least occupied worker. Should be extended in the derieved class  */
75   virtual Slave_worker* get_least_occupied_worker(Relay_log_info* rli,
76                                                   Slave_worker_array *ws,
77                                                   Log_event *ev)= 0;
78   /* wait for slave workers to finish */
79   virtual int wait_for_workers_to_finish(Relay_log_info *rli,
80                                          Slave_worker *ignore= NULL)=0;
81 
~Mts_submode()82   virtual ~Mts_submode(){}
83 };
84 
85 /**
86   DB partitioned submode
87   For significance of each method check definition of Mts_submode
88 */
89 class Mts_submode_database: public Mts_submode
90 {
91 public:
Mts_submode_database()92   Mts_submode_database()
93   {
94     type= MTS_PARALLEL_TYPE_DB_NAME;
95   }
96   int schedule_next_event(Relay_log_info* rli, Log_event *ev);
97   void attach_temp_tables(THD *thd, const Relay_log_info* rli,
98                                                       Query_log_event *ev);
99   void detach_temp_tables(THD *thd, const Relay_log_info* rli,
100                                                       Query_log_event *ev);
101   Slave_worker* get_least_occupied_worker(Relay_log_info* rli,
102                                           Slave_worker_array *ws,
103                                           Log_event *ev);
~Mts_submode_database()104   ~Mts_submode_database(){};
105   int wait_for_workers_to_finish(Relay_log_info  *rli,
106                                  Slave_worker *ignore= NULL);
107 };
108 
109 /**
110   Parallelization using Master parallelization information
111   For significance of each method check definition of Mts_submode
112  */
113 class Mts_submode_logical_clock: public Mts_submode
114 {
115 private:
116   bool first_event, force_new_group;
117   bool is_new_group;
118   uint delegated_jobs;
119   /* "instant" value of committed transactions low-water-mark */
120   longlong last_lwm_timestamp;
121   /* GAQ index corresponding to the min commit point */
122   ulong last_lwm_index;
123   longlong last_committed;
124   longlong sequence_number;
125 
126 public:
127   uint jobs_done;
128   bool is_error;
129   /*
130     the logical timestamp of the olderst transaction that is being waited by
131     before to resume scheduling.
132   */
133   longlong min_waited_timestamp;
134   /*
135     Committed transactions and those that are waiting for their commit parents
136     comprise sequences whose items are identified as GAQ index.
137     An empty sequence is described by the following magic value which can't
138     be in the GAQ legitimate range.
139     todo: an alternative could be to pass a magic value to the constructor.
140     E.g GAQ.size as a good candidate being outside of the valid range.
141     That requires further wl6314 refactoring in activation/deactivation
142     of the scheduler.
143   */
144   static const ulong INDEX_UNDEF= (ulong) -1;
145 
146 protected:
147   std::pair<uint, my_thread_id> get_server_and_thread_id(TABLE* table);
148   Slave_worker* get_free_worker(Relay_log_info *rli);
149 public:
150   Mts_submode_logical_clock();
151   int schedule_next_event(Relay_log_info* rli, Log_event *ev);
152   void attach_temp_tables(THD *thd, const Relay_log_info* rli,
153                                                       Query_log_event *ev);
154   void detach_temp_tables(THD *thd, const Relay_log_info* rli,
155                                                       Query_log_event *ev);
156   Slave_worker* get_least_occupied_worker(Relay_log_info* rli,
157                                           Slave_worker_array *ws,
158                                           Log_event *ev);
159   /* Sets the force new group variable */
start_new_group()160   inline void start_new_group()
161   {
162     force_new_group= true;
163     first_event= true;
164   }
165   /**
166     Withdraw the delegated_job increased by the group.
167   */
withdraw_delegated_job()168   void withdraw_delegated_job()
169   {
170     delegated_jobs--;
171   }
172   int wait_for_workers_to_finish(Relay_log_info  *rli,
173                                  Slave_worker *ignore= NULL);
174   bool wait_for_last_committed_trx(Relay_log_info* rli,
175                                    longlong last_committed_arg,
176                                    longlong lwm_estimate_arg);
177   /*
178     LEQ comparison of two logical timestamps follows regular rules for
179     integers. SEQ_UNINIT is regarded as the least value in the clock domain.
180 
181     @param a  the lhs logical timestamp value
182     @param b  the rhs logical timestamp value
183 
184     @return   true  when a "<=" b,
185               false otherwise
186   */
clock_leq(longlong a,longlong b)187   static bool clock_leq(longlong a, longlong b)
188   {
189     if (a == SEQ_UNINIT)
190       return true;
191     else if (b == SEQ_UNINIT)
192       return false;
193     else
194       return a <= b;
195   }
196 
197   longlong get_lwm_timestamp(Relay_log_info *rli, bool need_lock);
estimate_lwm_timestamp()198   longlong estimate_lwm_timestamp()
199   {
200     return my_atomic_load64(&last_lwm_timestamp);
201   };
~Mts_submode_logical_clock()202   ~Mts_submode_logical_clock() {}
203 };
204 
205 #endif /*MTS_SUBMODE_H*/
206