1 /* Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #ifndef RPL_RLI_PDB_H
24 
25 #define RPL_RLI_PDB_H
26 
27 #ifdef HAVE_REPLICATION
28 
29 #include "sql_string.h"
30 #include "rpl_rli.h"
31 #include <my_sys.h>
32 #include <my_bitmap.h>
33 #include "rpl_slave.h"
34 
35 /**
36   Legends running throughout the module:
37 
38   C  - Coordinator
39   CP - checkpoint
40   W  - Worker
41 
42   B-event event that Begins a group (a transaction)
43   T-event event that Terminates a group (a transaction)
44 */
45 
46 /* Assigned Partition Hash (APH) entry */
47 typedef struct st_db_worker_hash_entry
48 {
49   uint  db_len;
50   const char *db;
51   Slave_worker *worker;
52   /*
53     The number of transaction pending on this database.
54     This should only be modified under the lock slave_worker_hash_lock.
55    */
56   long usage;
57   /*
58     The list of temp tables belonging to @ db database is
59     attached to an assigned @c worker to become its thd->temporary_tables.
60     The list is updated with every ddl incl CREATE, DROP.
61     It is removed from the entry and merged to the coordinator's
62     thd->temporary_tables in case of events: slave stops, APH oversize.
63   */
64   TABLE* volatile temporary_tables;
65 
66   /* todo: relax concurrency to mimic record-level locking.
67      That is to augmenting the entry with mutex/cond pair
68      pthread_mutex_t
69      pthread_cond_t
70      timestamp updated_at; */
71 
72 } db_worker_hash_entry;
73 
74 bool init_hash_workers(ulong slave_parallel_workers);
75 void destroy_hash_workers(Relay_log_info*);
76 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
77                                db_worker_hash_entry **ptr_entry,
78                                bool need_temp_tables, Slave_worker *w);
79 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
80 int wait_for_workers_to_finish(Relay_log_info const *rli,
81                                Slave_worker *ignore= NULL);
82 
83 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
84 
85 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
86 
87 typedef struct slave_job_item
88 {
89   void *data;
90 } Slave_job_item;
91 
92 /**
93    The class defines a type of queue with a predefined max size that is
94    implemented using the circular memory buffer.
95    That is items of the queue are accessed as indexed elements of
96    the array buffer in a way that when the index value reaches
97    a max value it wraps around to point to the first buffer element.
98 */
99 class circular_buffer_queue
100 {
101 public:
102 
103   DYNAMIC_ARRAY Q;
104   ulong size;           // the Size of the queue in terms of element
105   ulong avail;          // first Available index to append at (next to tail)
106   ulong entry;          // the head index or the entry point to the queue.
107   volatile ulong len;   // actual length
108   bool inited_queue;
109 
110   circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
size(max)111     size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
112   {
113     DBUG_ASSERT(size < (ulong) -1);
114     if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
115       inited_queue= TRUE;
116   }
circular_buffer_queue()117   circular_buffer_queue () : inited_queue(FALSE) {}
~circular_buffer_queue()118   ~circular_buffer_queue ()
119   {
120     if (inited_queue)
121       delete_dynamic(&Q);
122   }
123 
124    /**
125       Content of the being dequeued item is copied to the arg-pointer
126       location.
127 
128       @return the queue's array index that the de-queued item
129       located at, or
130       an error encoded in beyond the index legacy range.
131    */
132   ulong de_queue(uchar *);
133   /**
134      Similar to de_queue but extracting happens from the tail side.
135   */
136   ulong de_tail(uchar *val);
137 
138   /**
139     return the index where the arg item locates
140            or an error encoded as a value in beyond of the legacy range
141            [0, size) (value `size' is excluded).
142   */
143   ulong en_queue(void *item);
144   /**
145      return the value of @c data member of the head of the queue.
146   */
147   void* head_queue();
148   bool   gt(ulong i, ulong k); // comparision of ordering of two entities
149   /* index is within the valid range */
in(ulong k)150   bool in(ulong k) { return !empty() &&
151       (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
empty()152   bool empty() { return entry == size; }
full()153   bool full() { return avail == size; }
154 };
155 
156 typedef struct st_slave_job_group
157 {
158   char *group_master_log_name;   // (actually redundant)
159   /*
160     T-event lop_pos filled by Worker for CheckPoint (CP)
161   */
162   my_off_t group_master_log_pos;
163 
164   /*
165      When relay-log name changes  allocates and fill in a new name of relay-log,
166      otherwise it fills in NULL.
167      Coordinator keeps track of each Worker has been notified on the updating
168      to make sure the routine runs once per change.
169 
170      W checks the value at commit and memoriezes a not-NULL.
171      Freeing unless NULL is left to Coordinator at CP.
172   */
173   char     *group_relay_log_name; // The value is last seen relay-log
174   my_off_t group_relay_log_pos;  // filled by W
175   ulong worker_id;
176   Slave_worker *worker;
177   ulonglong total_seqno;
178 
179   my_off_t master_log_pos;       // B-event log_pos
180   /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
181   uint  checkpoint_seqno;
182   my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
183   char*    checkpoint_log_name;
184   my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
185   char*    checkpoint_relay_log_name;
186   volatile uchar done;  // Flag raised by W,  read and reset by Coordinator
187   ulong    shifted;     // shift the last CP bitmap at receiving a new CP
188   time_t   ts;          // Group's timestampt to update Seconds_behind_master
189 #ifndef DBUG_OFF
190   bool     notified;    // to debug group_master_log_name change notification
191 #endif
192   /*
193     Coordinator fills the struct with defaults and options at starting of
194     a group distribution.
195   */
resetst_slave_job_group196   void reset(my_off_t master_pos, ulonglong seqno)
197   {
198     master_log_pos= master_pos;
199     group_master_log_pos= group_relay_log_pos= 0;
200     group_master_log_name= NULL; // todo: remove
201     group_relay_log_name= NULL;
202     worker_id= MTS_WORKER_UNDEF;
203     total_seqno= seqno;
204     checkpoint_log_name= NULL;
205     checkpoint_log_pos= 0;
206     checkpoint_relay_log_name= NULL;
207     checkpoint_relay_log_pos= 0;
208     checkpoint_seqno= (uint) -1;
209     done= 0;
210 #ifndef DBUG_OFF
211     notified= false;
212 #endif
213   }
214 } Slave_job_group;
215 
216 /**
217   Group Assigned Queue whose first element identifies first gap
218   in committed sequence. The head of the queue is therefore next to
219   the low-water-mark.
220 */
221 class Slave_committed_queue : public circular_buffer_queue
222 {
223 public:
224 
225   bool inited;
226 
227   /* master's Rot-ev exec */
228   void update_current_binlog(const char *post_rotate);
229 
230   /*
231      The last checkpoint time Low-Water-Mark
232   */
233   Slave_job_group lwm;
234 
235   /* last time processed indexes for each worker */
236   DYNAMIC_ARRAY last_done;
237 
238   /* the being assigned group index in GAQ */
239   ulong assigned_group_index;
240 
241   Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
242                          uint inc= 0)
circular_buffer_queue(el_size,max,inc)243     : circular_buffer_queue(el_size, max, inc), inited(FALSE)
244   {
245     uint k;
246     ulonglong l= 0;
247 
248     if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
249       return;
250     else
251       inited= TRUE;
252     my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
253     for (k= 0; k < n; k++)
254       insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
255     lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
256     lwm.group_relay_log_name[0]= 0;
257   }
258 
~Slave_committed_queue()259   ~Slave_committed_queue ()
260   {
261     if (inited)
262     {
263       delete_dynamic(&last_done);
264       my_free(lwm.group_relay_log_name);
265       free_dynamic_items();  // free possibly left allocated strings in GAQ list
266     }
267   }
268 
269 #ifndef DBUG_OFF
270   bool count_done(Relay_log_info* rli);
271 #endif
272 
273   /* Checkpoint routine refreshes the queue */
274   ulong move_queue_head(DYNAMIC_ARRAY *ws);
275   /* Method is for slave shutdown time cleanup */
276   void free_dynamic_items();
277   /*
278      returns a pointer to Slave_job_group struct instance as indexed by arg
279      in the circular buffer dyn-array
280   */
get_job_group(ulong ind)281   Slave_job_group* get_job_group(ulong ind)
282   {
283     return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
284   }
285 
286   /**
287      Assignes @c assigned_group_index to an index of enqueued item
288      and returns it.
289   */
en_queue(void * item)290   ulong en_queue(void *item)
291   {
292     return assigned_group_index= circular_buffer_queue::en_queue(item);
293   }
294 
295 };
296 
297 class Slave_jobs_queue : public circular_buffer_queue
298 {
299 public:
300 
301   /*
302      Coordinator marks with true, Worker signals back at queue back to
303      available
304   */
305   bool overfill;
306   ulonglong waited_overfill;
307 };
308 
309 class Slave_worker : public Relay_log_info
310 {
311 public:
312   Slave_worker(Relay_log_info *rli
313 #ifdef HAVE_PSI_INTERFACE
314                ,PSI_mutex_key *param_key_info_run_lock,
315                PSI_mutex_key *param_key_info_data_lock,
316                PSI_mutex_key *param_key_info_sleep_lock,
317                PSI_mutex_key *param_key_info_data_cond,
318                PSI_mutex_key *param_key_info_start_cond,
319                PSI_mutex_key *param_key_info_stop_cond,
320                PSI_mutex_key *param_key_info_sleep_cond
321 #endif
322                , uint param_id
323               );
324   virtual ~Slave_worker();
325 
326   Slave_jobs_queue jobs;   // assignment queue containing events to execute
327   mysql_mutex_t jobs_lock; // mutex for the jobs queue
328   mysql_cond_t  jobs_cond; // condition variable for the jobs queue
329   Relay_log_info *c_rli;   // pointer to Coordinator's rli
330   DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
331   bool curr_group_seen_begin; // is set to TRUE with explicit B-event
332   ulong id;                 // numberic identifier of the Worker
333 
334   /*
335     Worker runtime statictics
336   */
337   // the index in GAQ of the last processed group by this Worker
338   volatile ulong last_group_done_index;
339   ulonglong last_groups_assigned_index; // index of previous group assigned to worker
340   ulong wq_empty_waits;  // how many times got idle
341   ulong events_done;     // how many events (statements) processed
342   ulong groups_done;     // how many groups (transactions) processed
343   volatile int curr_jobs; // number of active  assignments
344   // number of partitions allocated to the worker at point in time
345   long usage_partition;
346   // symmetric to rli->mts_end_group_sets_max_dbs
347   bool end_group_sets_max_dbs;
348 
349   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
350   volatile bool checkpoint_notified; // Coord sets and resets, W can read
351   volatile bool master_log_change_notified; // Coord sets and resets, W can read
352   ulong bitmap_shifted;  // shift the last bitmap at receiving new CP
353   // WQ current excess above the overrun level
354   long wq_overrun_cnt;
355   /*
356     number of events starting from which Worker queue is regarded as
357     close to full. The number of the excessive events yields a weight factor
358     to compute Coordinator's nap.
359   */
360   ulong overrun_level;
361   /*
362      reverse to overrun: the number of events below which Worker is
363      considered underruning
364   */
365   ulong underrun_level;
366   /*
367     Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker.
368     When WQ length is dropped below overrun the counter is reset.
369   */
370   ulong excess_cnt;
371   /*
372     Coordinates of the last CheckPoint (CP) this Worker has
373     acknowledged; part of is persisent data
374   */
375   char checkpoint_relay_log_name[FN_REFLEN];
376   ulonglong checkpoint_relay_log_pos;
377   char checkpoint_master_log_name[FN_REFLEN];
378   ulonglong checkpoint_master_log_pos;
379   MY_BITMAP group_executed; // bitmap describes groups executed after last CP
380   MY_BITMAP group_shifted;  // temporary bitmap to compute group_executed
381   ulong checkpoint_seqno;   // the most significant ON bit in group_executed
382   enum en_running_state
383   {
384     NOT_RUNNING= 0,
385     RUNNING= 1,
386     ERROR_LEAVING= 2,         // is set by Worker
387     STOP= 3,                  // is set by Coordinator upon receiving STOP
388     STOP_ACCEPTED= 4          // is set by worker upon completing job when STOP SLAVE is issued
389   };
390   /*
391     The running status is guarded by jobs_lock mutex that a writer
392     Coordinator or Worker itself needs to hold when write a new value.
393   */
394   en_running_state volatile running_status;
395   /*
396     exit_incremented indicates whether worker has contributed to max updated index.
397     By default it is set to false. When the worker contibutes for the first time this
398     variable is set to true.
399   */
400   bool exit_incremented;
401 
402   int init_worker(Relay_log_info*, ulong);
403   int rli_init_info(bool);
404   int flush_info(bool force= FALSE);
405   static size_t get_number_worker_fields();
406   void slave_worker_ends_group(Log_event*, int);
407   const char *get_master_log_name();
get_master_log_pos()408   ulonglong get_master_log_pos() { return master_log_pos; };
set_master_log_pos(ulong val)409   ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; };
410   bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force);
411   /*
412     When commit fails clear bitmap for executed worker group. Revert back the
413     positions to the old positions that existed before commit using the checkpoint.
414 
415     @param Slave_job_group a pointer to Slave_job_group struct instance which
416     holds group master log pos, group relay log pos and checkpoint positions.
417   */
418   void rollback_positions(Slave_job_group *ptr_g);
419   bool reset_recovery_info();
420   /**
421      Different from the parent method in that this does not delete
422      rli_description_event.
423      The method runs by Coordinator when Worker are synched or being
424      destroyed.
425   */
set_rli_description_event(Format_description_log_event * fdle)426   void set_rli_description_event(Format_description_log_event *fdle)
427   {
428     DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd));
429 #ifndef DBUG_OFF
430     if (fdle)
431       mysql_mutex_assert_owner(&jobs_lock);
432 #endif
433 
434     if (fdle)
435       adapt_to_master_version(fdle);
436     rli_description_event= fdle;
437   }
438 
reset_gaq_index()439   inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; };
set_gaq_index(ulong val)440   inline void set_gaq_index(ulong val)
441   {
442     if (gaq_index == c_rli->gaq->size)
443       gaq_index= val;
444   };
445 
446 protected:
447 
448   virtual void do_report(loglevel level, int err_code,
449                          const char *msg, va_list v_args) const;
450 
451 private:
452   ulong gaq_index;          // GAQ index of the current assignment
453   ulonglong master_log_pos; // event's cached log_pos for possibile error report
454   void end_info();
455   bool read_info(Rpl_info_handler *from);
456   bool write_info(Rpl_info_handler *to);
457   Slave_worker& operator=(const Slave_worker& info);
458   Slave_worker(const Slave_worker& info);
459 };
460 
461 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
462 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item);
463 bool set_max_updated_index_on_stop(Slave_worker *worker,
464                                    Slave_job_item *job_item);
465 
466 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
467 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
468 #endif // HAVE_REPLICATION
469 #endif
470