1 /* Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #ifndef RPL_RLI_PDB_H
24 
25 #define RPL_RLI_PDB_H
26 
27 #ifdef HAVE_REPLICATION
28 
29 #include "sql_string.h"
30 #include "rpl_rli.h"
31 #include <my_sys.h>
32 #include <my_bitmap.h>
33 #include "rpl_slave.h"
34 
35 /**
36   Legends running throughout the module:
37 
38   C  - Coordinator
39   CP - checkpoint
40   W  - Worker
41 
42   B-event event that Begins a group (a transaction)
43   T-event event that Terminates a group (a transaction)
44 */
45 
46 /* Assigned Partition Hash (APH) entry */
47 typedef struct st_db_worker_hash_entry
48 {
49   uint  db_len;
50   const char *db;
51   Slave_worker *worker;
52   /*
53     The number of transaction pending on this database.
54     This should only be modified under the lock slave_worker_hash_lock.
55    */
56   long usage;
57   /*
58     The list of temp tables belonging to @ db database is
59     attached to an assigned @c worker to become its thd->temporary_tables.
60     The list is updated with every ddl incl CREATE, DROP.
61     It is removed from the entry and merged to the coordinator's
62     thd->temporary_tables in case of events: slave stops, APH oversize.
63   */
64   TABLE* volatile temporary_tables;
65 
66   /* todo: relax concurrency to mimic record-level locking.
67      That is to augmenting the entry with mutex/cond pair
68      pthread_mutex_t
69      pthread_cond_t
70      timestamp updated_at; */
71 
72 } db_worker_hash_entry;
73 
74 bool init_hash_workers(ulong slave_parallel_workers);
75 void destroy_hash_workers(Relay_log_info*);
76 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
77                                db_worker_hash_entry **ptr_entry,
78                                bool need_temp_tables, Slave_worker *w);
79 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
80 int wait_for_workers_to_finish(Relay_log_info const *rli,
81                                Slave_worker *ignore= NULL);
82 
83 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
84 
85 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
86 
87 typedef struct slave_job_item
88 {
89   void *data;
90 } Slave_job_item;
91 
92 /**
93    The class defines a type of queue with a predefined max size that is
94    implemented using the circular memory buffer.
95    That is items of the queue are accessed as indexed elements of
96    the array buffer in a way that when the index value reaches
97    a max value it wraps around to point to the first buffer element.
98 */
99 class circular_buffer_queue
100 {
101 public:
102 
103   DYNAMIC_ARRAY Q;
104   ulong size;           // the Size of the queue in terms of element
105   ulong avail;          // first Available index to append at (next to tail)
106   ulong entry;          // the head index or the entry point to the queue.
107   volatile ulong len;   // actual length
108   bool inited_queue;
109 
110   circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
size(max)111     size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
112   {
113     DBUG_ASSERT(size < (ulong) -1);
114     if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
115       inited_queue= TRUE;
116   }
circular_buffer_queue()117   circular_buffer_queue () : inited_queue(FALSE) {}
~circular_buffer_queue()118   ~circular_buffer_queue ()
119   {
120     if (inited_queue)
121       delete_dynamic(&Q);
122   }
123 
124    /**
125       Content of the being dequeued item is copied to the arg-pointer
126       location.
127 
128       @return the queue's array index that the de-queued item
129       located at, or
130       an error encoded in beyond the index legacy range.
131    */
132   ulong de_queue(uchar *);
133   /**
134      Similar to de_queue but extracting happens from the tail side.
135   */
136   ulong de_tail(uchar *val);
137 
138   /**
139     return the index where the arg item locates
140            or an error encoded as a value in beyond of the legacy range
141            [0, size) (value `size' is excluded).
142   */
143   ulong en_queue(void *item);
144   /**
145      return the value of @c data member of the head of the queue.
146   */
147   void* head_queue();
148   bool   gt(ulong i, ulong k); // comparision of ordering of two entities
149   /* index is within the valid range */
in(ulong k)150   bool in(ulong k) { return !empty() &&
151       (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
empty()152   bool empty() { return entry == size; }
full()153   bool full() { return avail == size; }
154 };
155 
156 typedef struct st_slave_job_group
157 {
158   char *group_master_log_name;   // (actually redundant)
159   /*
160     T-event lop_pos filled by Worker for CheckPoint (CP)
161   */
162   my_off_t group_master_log_pos;
163 
164   /*
165      When relay-log name changes  allocates and fill in a new name of relay-log,
166      otherwise it fills in NULL.
167      Coordinator keeps track of each Worker has been notified on the updating
168      to make sure the routine runs once per change.
169 
170      W checks the value at commit and memoriezes a not-NULL.
171      Freeing unless NULL is left to Coordinator at CP.
172   */
173   char     *group_relay_log_name; // The value is last seen relay-log
174   my_off_t group_relay_log_pos;  // filled by W
175   ulong worker_id;
176   Slave_worker *worker;
177   ulonglong total_seqno;
178 
179   my_off_t master_log_pos;       // B-event log_pos
180   /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
181   uint  checkpoint_seqno;
182   my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
183   char*    checkpoint_log_name;
184   my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
185   char*    checkpoint_relay_log_name;
186   volatile uchar done;  // Flag raised by W,  read and reset by Coordinator
187   ulong    shifted;     // shift the last CP bitmap at receiving a new CP
188   time_t   ts;          // Group's timestampt to update Seconds_behind_master
189 #ifndef DBUG_OFF
190   bool     notified;    // to debug group_master_log_name change notification
191 #endif
192   /*
193     Coordinator fills the struct with defaults and options at starting of
194     a group distribution.
195   */
resetst_slave_job_group196   void reset(my_off_t master_pos, ulonglong seqno)
197   {
198     master_log_pos= master_pos;
199     group_master_log_pos= group_relay_log_pos= 0;
200     group_master_log_name= NULL; // todo: remove
201     group_relay_log_name= NULL;
202     worker_id= MTS_WORKER_UNDEF;
203     total_seqno= seqno;
204     checkpoint_log_name= NULL;
205     checkpoint_log_pos= 0;
206     checkpoint_relay_log_name= NULL;
207     checkpoint_relay_log_pos= 0;
208     checkpoint_seqno= (uint) -1;
209     done= 0;
210     ts= 0;
211 #ifndef DBUG_OFF
212     notified= false;
213 #endif
214   }
215 } Slave_job_group;
216 
217 /**
218   Group Assigned Queue whose first element identifies first gap
219   in committed sequence. The head of the queue is therefore next to
220   the low-water-mark.
221 */
222 class Slave_committed_queue : public circular_buffer_queue
223 {
224 public:
225 
226   bool inited;
227 
228   /* master's Rot-ev exec */
229   void update_current_binlog(const char *post_rotate);
230 
231   /*
232      The last checkpoint time Low-Water-Mark
233   */
234   Slave_job_group lwm;
235 
236   /* last time processed indexes for each worker */
237   DYNAMIC_ARRAY last_done;
238 
239   /* the being assigned group index in GAQ */
240   ulong assigned_group_index;
241 
242   Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
243                          uint inc= 0)
circular_buffer_queue(el_size,max,inc)244     : circular_buffer_queue(el_size, max, inc), inited(FALSE)
245   {
246     uint k;
247     ulonglong l= 0;
248 
249     if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
250       return;
251     else
252       inited= TRUE;
253     my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
254     for (k= 0; k < n; k++)
255       insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
256     lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
257     lwm.group_relay_log_name[0]= 0;
258   }
259 
~Slave_committed_queue()260   ~Slave_committed_queue ()
261   {
262     if (inited)
263     {
264       delete_dynamic(&last_done);
265       my_free(lwm.group_relay_log_name);
266       free_dynamic_items();  // free possibly left allocated strings in GAQ list
267     }
268   }
269 
270 #ifndef DBUG_OFF
271   bool count_done(Relay_log_info* rli);
272 #endif
273 
274   /* Checkpoint routine refreshes the queue */
275   ulong move_queue_head(DYNAMIC_ARRAY *ws);
276   /* Method is for slave shutdown time cleanup */
277   void free_dynamic_items();
278   /*
279      returns a pointer to Slave_job_group struct instance as indexed by arg
280      in the circular buffer dyn-array
281   */
get_job_group(ulong ind)282   Slave_job_group* get_job_group(ulong ind)
283   {
284     return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
285   }
286 
287   /**
288      Assignes @c assigned_group_index to an index of enqueued item
289      and returns it.
290   */
en_queue(void * item)291   ulong en_queue(void *item)
292   {
293     return assigned_group_index= circular_buffer_queue::en_queue(item);
294   }
295 
296 };
297 
298 class Slave_jobs_queue : public circular_buffer_queue
299 {
300 public:
301 
302   /*
303      Coordinator marks with true, Worker signals back at queue back to
304      available
305   */
306   bool overfill;
307   ulonglong waited_overfill;
308 };
309 
310 class Slave_worker : public Relay_log_info
311 {
312 public:
313   Slave_worker(Relay_log_info *rli
314 #ifdef HAVE_PSI_INTERFACE
315                ,PSI_mutex_key *param_key_info_run_lock,
316                PSI_mutex_key *param_key_info_data_lock,
317                PSI_mutex_key *param_key_info_sleep_lock,
318                PSI_mutex_key *param_key_info_data_cond,
319                PSI_mutex_key *param_key_info_start_cond,
320                PSI_mutex_key *param_key_info_stop_cond,
321                PSI_mutex_key *param_key_info_sleep_cond
322 #endif
323                , uint param_id
324               );
325   virtual ~Slave_worker();
326 
327   Slave_jobs_queue jobs;   // assignment queue containing events to execute
328   mysql_mutex_t jobs_lock; // mutex for the jobs queue
329   mysql_cond_t  jobs_cond; // condition variable for the jobs queue
330   Relay_log_info *c_rli;   // pointer to Coordinator's rli
331   DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
332   bool curr_group_seen_begin; // is set to TRUE with explicit B-event
333   ulong id;                 // numberic identifier of the Worker
334 
335   /*
336     Worker runtime statictics
337   */
338   // the index in GAQ of the last processed group by this Worker
339   volatile ulong last_group_done_index;
340   ulonglong last_groups_assigned_index; // index of previous group assigned to worker
341   ulong wq_empty_waits;  // how many times got idle
342   ulong events_done;     // how many events (statements) processed
343   ulong groups_done;     // how many groups (transactions) processed
344   volatile int curr_jobs; // number of active  assignments
345   // number of partitions allocated to the worker at point in time
346   long usage_partition;
347   // symmetric to rli->mts_end_group_sets_max_dbs
348   bool end_group_sets_max_dbs;
349 
350   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
351   volatile bool checkpoint_notified; // Coord sets and resets, W can read
352   volatile bool master_log_change_notified; // Coord sets and resets, W can read
353   ulong bitmap_shifted;  // shift the last bitmap at receiving new CP
354   // WQ current excess above the overrun level
355   long wq_overrun_cnt;
356   /*
357     number of events starting from which Worker queue is regarded as
358     close to full. The number of the excessive events yields a weight factor
359     to compute Coordinator's nap.
360   */
361   ulong overrun_level;
362   /*
363      reverse to overrun: the number of events below which Worker is
364      considered underruning
365   */
366   ulong underrun_level;
367   /*
368     Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
369     When WQ length is dropped below overrun the counter is reset.
370   */
371   ulong excess_cnt;
372   /*
373     Coordinates of the last CheckPoint (CP) this Worker has
374     acknowledged; part of is persisent data
375   */
376   char checkpoint_relay_log_name[FN_REFLEN];
377   ulonglong checkpoint_relay_log_pos;
378   char checkpoint_master_log_name[FN_REFLEN];
379   ulonglong checkpoint_master_log_pos;
380   MY_BITMAP group_executed; // bitmap describes groups executed after last CP
381   MY_BITMAP group_shifted;  // temporary bitmap to compute group_executed
382   ulong checkpoint_seqno;   // the most significant ON bit in group_executed
383   enum en_running_state
384   {
385     NOT_RUNNING= 0,
386     RUNNING= 1,
387     ERROR_LEAVING= 2,         // is set by Worker
388     STOP= 3,                  // is set by Coordinator upon receiving STOP
389     STOP_ACCEPTED= 4          // is set by worker upon completing job when STOP SLAVE is issued
390   };
391   /*
392     The running status is guarded by jobs_lock mutex that a writer
393     Coordinator or Worker itself needs to hold when write a new value.
394   */
395   en_running_state volatile running_status;
396   /*
397     exit_incremented indicates whether worker has contributed to max updated index.
398     By default it is set to false. When the worker contibutes for the first time this
399     variable is set to true.
400   */
401   bool exit_incremented;
402 
403   int init_worker(Relay_log_info*, ulong);
404   int rli_init_info(bool);
405   int flush_info(bool force= FALSE);
406   static size_t get_number_worker_fields();
407   void slave_worker_ends_group(Log_event*, int);
408   const char *get_master_log_name();
get_master_log_pos()409   ulonglong get_master_log_pos() { return master_log_pos; };
set_master_log_pos(ulong val)410   ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
411   bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
412   /*
413     When commit fails clear bitmap for executed worker group. Revert back the
414     positions to the old positions that existed before commit using the checkpoint.
415 
416     @param Slave_job_group a pointer to Slave_job_group struct instance which
417     holds group master log pos, group relay log pos and checkpoint positions.
418   */
419   void rollback_positions(Slave_job_group *ptr_g);
420   bool reset_recovery_info();
421   /**
422      Different from the parent method in that this does not delete
423      rli_description_event.
424      The method runs by Coordinator when Worker are synched or being
425      destroyed.
426   */
set_rli_description_event(Format_description_log_event * fdle)427   void set_rli_description_event(Format_description_log_event *fdle)
428   {
429     DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd));
430 #ifndef DBUG_OFF
431     if (fdle)
432       mysql_mutex_assert_owner(&jobs_lock);
433 #endif
434 
435     if (fdle)
436       adapt_to_master_version(fdle);
437     rli_description_event= fdle;
438   }
439 
reset_gaq_index()440   inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
set_gaq_index(ulong val)441   inline void set_gaq_index(ulong val)
442   {
443     if (gaq_index == c_rli->gaq->size)
444       gaq_index= val;
445   };
446 
447   // overridden new and delete operators for 64 byte alignment
448   static void* operator new(size_t request);
449   static void operator delete(void * ptr);
450 
451 protected:
452 
453   virtual void do_report(loglevel level, int err_code,
454                          const char *msg, va_list v_args) const;
455 
456 private:
457   ulong gaq_index;          // GAQ index of the current assignment
458   ulonglong master_log_pos; // event's cached log_pos for possibile error report
459   void end_info();
460   bool read_info(Rpl_info_handler *from);
461   bool write_info(Rpl_info_handler *to);
462   Slave_worker& operator=(const Slave_worker& info);
463   Slave_worker(const Slave_worker& info);
464 };
465 
466 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
467 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item);
468 bool set_max_updated_index_on_stop(Slave_worker *worker,
469                                    Slave_job_item *job_item);
470 
471 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
472 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
473 #endif // HAVE_REPLICATION
474 #endif
475