1 /* Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 22 23 #ifndef RPL_RLI_PDB_H 24 25 #define RPL_RLI_PDB_H 26 27 #ifdef HAVE_REPLICATION 28 29 #include "sql_string.h" 30 #include "rpl_rli.h" 31 #include <my_sys.h> 32 #include <my_bitmap.h> 33 #include "rpl_slave.h" 34 35 /** 36 Legends running throughout the module: 37 38 C - Coordinator 39 CP - checkpoint 40 W - Worker 41 42 B-event event that Begins a group (a transaction) 43 T-event event that Terminates a group (a transaction) 44 */ 45 46 /* Assigned Partition Hash (APH) entry */ 47 typedef struct st_db_worker_hash_entry 48 { 49 uint db_len; 50 const char *db; 51 Slave_worker *worker; 52 /* 53 The number of transaction pending on this database. 54 This should only be modified under the lock slave_worker_hash_lock. 55 */ 56 long usage; 57 /* 58 The list of temp tables belonging to @ db database is 59 attached to an assigned @c worker to become its thd->temporary_tables. 60 The list is updated with every ddl incl CREATE, DROP. 61 It is removed from the entry and merged to the coordinator's 62 thd->temporary_tables in case of events: slave stops, APH oversize. 63 */ 64 TABLE* volatile temporary_tables; 65 66 /* todo: relax concurrency to mimic record-level locking. 67 That is to augmenting the entry with mutex/cond pair 68 pthread_mutex_t 69 pthread_cond_t 70 timestamp updated_at; */ 71 72 } db_worker_hash_entry; 73 74 bool init_hash_workers(ulong slave_parallel_workers); 75 void destroy_hash_workers(Relay_log_info*); 76 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, 77 db_worker_hash_entry **ptr_entry, 78 bool need_temp_tables, Slave_worker *w); 79 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers); 80 int wait_for_workers_to_finish(Relay_log_info const *rli, 81 Slave_worker *ignore= NULL); 82 83 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray 84 85 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 86 87 typedef struct slave_job_item 88 { 89 void *data; 90 } Slave_job_item; 91 92 /** 93 The class defines a type of queue with a predefined max size that is 94 implemented using the circular memory buffer. 95 That is items of the queue are accessed as indexed elements of 96 the array buffer in a way that when the index value reaches 97 a max value it wraps around to point to the first buffer element. 98 */ 99 class circular_buffer_queue 100 { 101 public: 102 103 DYNAMIC_ARRAY Q; 104 ulong size; // the Size of the queue in terms of element 105 ulong avail; // first Available index to append at (next to tail) 106 ulong entry; // the head index or the entry point to the queue. 107 volatile ulong len; // actual length 108 bool inited_queue; 109 110 circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) : size(max)111 size(max), avail(0), entry(max), len(0), inited_queue(FALSE) 112 { 113 DBUG_ASSERT(size < (ulong) -1); 114 if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc)) 115 inited_queue= TRUE; 116 } circular_buffer_queue()117 circular_buffer_queue () : inited_queue(FALSE) {} ~circular_buffer_queue()118 ~circular_buffer_queue () 119 { 120 if (inited_queue) 121 delete_dynamic(&Q); 122 } 123 124 /** 125 Content of the being dequeued item is copied to the arg-pointer 126 location. 127 128 @return the queue's array index that the de-queued item 129 located at, or 130 an error encoded in beyond the index legacy range. 131 */ 132 ulong de_queue(uchar *); 133 /** 134 Similar to de_queue but extracting happens from the tail side. 135 */ 136 ulong de_tail(uchar *val); 137 138 /** 139 return the index where the arg item locates 140 or an error encoded as a value in beyond of the legacy range 141 [0, size) (value `size' is excluded). 142 */ 143 ulong en_queue(void *item); 144 /** 145 return the value of @c data member of the head of the queue. 146 */ 147 void* head_queue(); 148 bool gt(ulong i, ulong k); // comparision of ordering of two entities 149 /* index is within the valid range */ in(ulong k)150 bool in(ulong k) { return !empty() && 151 (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); } empty()152 bool empty() { return entry == size; } full()153 bool full() { return avail == size; } 154 }; 155 156 typedef struct st_slave_job_group 157 { 158 char *group_master_log_name; // (actually redundant) 159 /* 160 T-event lop_pos filled by Worker for CheckPoint (CP) 161 */ 162 my_off_t group_master_log_pos; 163 164 /* 165 When relay-log name changes allocates and fill in a new name of relay-log, 166 otherwise it fills in NULL. 167 Coordinator keeps track of each Worker has been notified on the updating 168 to make sure the routine runs once per change. 169 170 W checks the value at commit and memoriezes a not-NULL. 171 Freeing unless NULL is left to Coordinator at CP. 172 */ 173 char *group_relay_log_name; // The value is last seen relay-log 174 my_off_t group_relay_log_pos; // filled by W 175 ulong worker_id; 176 Slave_worker *worker; 177 ulonglong total_seqno; 178 179 my_off_t master_log_pos; // B-event log_pos 180 /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */ 181 uint checkpoint_seqno; 182 my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint 183 char* checkpoint_log_name; 184 my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint 185 char* checkpoint_relay_log_name; 186 volatile uchar done; // Flag raised by W, read and reset by Coordinator 187 ulong shifted; // shift the last CP bitmap at receiving a new CP 188 time_t ts; // Group's timestampt to update Seconds_behind_master 189 #ifndef DBUG_OFF 190 bool notified; // to debug group_master_log_name change notification 191 #endif 192 /* 193 Coordinator fills the struct with defaults and options at starting of 194 a group distribution. 195 */ resetst_slave_job_group196 void reset(my_off_t master_pos, ulonglong seqno) 197 { 198 master_log_pos= master_pos; 199 group_master_log_pos= group_relay_log_pos= 0; 200 group_master_log_name= NULL; // todo: remove 201 group_relay_log_name= NULL; 202 worker_id= MTS_WORKER_UNDEF; 203 total_seqno= seqno; 204 checkpoint_log_name= NULL; 205 checkpoint_log_pos= 0; 206 checkpoint_relay_log_name= NULL; 207 checkpoint_relay_log_pos= 0; 208 checkpoint_seqno= (uint) -1; 209 done= 0; 210 #ifndef DBUG_OFF 211 notified= false; 212 #endif 213 } 214 } Slave_job_group; 215 216 /** 217 Group Assigned Queue whose first element identifies first gap 218 in committed sequence. The head of the queue is therefore next to 219 the low-water-mark. 220 */ 221 class Slave_committed_queue : public circular_buffer_queue 222 { 223 public: 224 225 bool inited; 226 227 /* master's Rot-ev exec */ 228 void update_current_binlog(const char *post_rotate); 229 230 /* 231 The last checkpoint time Low-Water-Mark 232 */ 233 Slave_job_group lwm; 234 235 /* last time processed indexes for each worker */ 236 DYNAMIC_ARRAY last_done; 237 238 /* the being assigned group index in GAQ */ 239 ulong assigned_group_index; 240 241 Slave_committed_queue (const char *log, uint el_size, ulong max, uint n, 242 uint inc= 0) circular_buffer_queue(el_size,max,inc)243 : circular_buffer_queue(el_size, max, inc), inited(FALSE) 244 { 245 uint k; 246 ulonglong l= 0; 247 248 if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue) 249 return; 250 else 251 inited= TRUE; 252 my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0); 253 for (k= 0; k < n; k++) 254 insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker 255 lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0)); 256 lwm.group_relay_log_name[0]= 0; 257 } 258 ~Slave_committed_queue()259 ~Slave_committed_queue () 260 { 261 if (inited) 262 { 263 delete_dynamic(&last_done); 264 my_free(lwm.group_relay_log_name); 265 free_dynamic_items(); // free possibly left allocated strings in GAQ list 266 } 267 } 268 269 #ifndef DBUG_OFF 270 bool count_done(Relay_log_info* rli); 271 #endif 272 273 /* Checkpoint routine refreshes the queue */ 274 ulong move_queue_head(DYNAMIC_ARRAY *ws); 275 /* Method is for slave shutdown time cleanup */ 276 void free_dynamic_items(); 277 /* 278 returns a pointer to Slave_job_group struct instance as indexed by arg 279 in the circular buffer dyn-array 280 */ get_job_group(ulong ind)281 Slave_job_group* get_job_group(ulong ind) 282 { 283 return (Slave_job_group*) dynamic_array_ptr(&Q, ind); 284 } 285 286 /** 287 Assignes @c assigned_group_index to an index of enqueued item 288 and returns it. 289 */ en_queue(void * item)290 ulong en_queue(void *item) 291 { 292 return assigned_group_index= circular_buffer_queue::en_queue(item); 293 } 294 295 }; 296 297 class Slave_jobs_queue : public circular_buffer_queue 298 { 299 public: 300 301 /* 302 Coordinator marks with true, Worker signals back at queue back to 303 available 304 */ 305 bool overfill; 306 ulonglong waited_overfill; 307 }; 308 309 class Slave_worker : public Relay_log_info 310 { 311 public: 312 Slave_worker(Relay_log_info *rli 313 #ifdef HAVE_PSI_INTERFACE 314 ,PSI_mutex_key *param_key_info_run_lock, 315 PSI_mutex_key *param_key_info_data_lock, 316 PSI_mutex_key *param_key_info_sleep_lock, 317 PSI_mutex_key *param_key_info_data_cond, 318 PSI_mutex_key *param_key_info_start_cond, 319 PSI_mutex_key *param_key_info_stop_cond, 320 PSI_mutex_key *param_key_info_sleep_cond 321 #endif 322 , uint param_id 323 ); 324 virtual ~Slave_worker(); 325 326 Slave_jobs_queue jobs; // assignment queue containing events to execute 327 mysql_mutex_t jobs_lock; // mutex for the jobs queue 328 mysql_cond_t jobs_cond; // condition variable for the jobs queue 329 Relay_log_info *c_rli; // pointer to Coordinator's rli 330 DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions 331 bool curr_group_seen_begin; // is set to TRUE with explicit B-event 332 ulong id; // numberic identifier of the Worker 333 334 /* 335 Worker runtime statictics 336 */ 337 // the index in GAQ of the last processed group by this Worker 338 volatile ulong last_group_done_index; 339 ulonglong last_groups_assigned_index; // index of previous group assigned to worker 340 ulong wq_empty_waits; // how many times got idle 341 ulong events_done; // how many events (statements) processed 342 ulong groups_done; // how many groups (transactions) processed 343 volatile int curr_jobs; // number of active assignments 344 // number of partitions allocated to the worker at point in time 345 long usage_partition; 346 // symmetric to rli->mts_end_group_sets_max_dbs 347 bool end_group_sets_max_dbs; 348 349 volatile bool relay_log_change_notified; // Coord sets and resets, W can read 350 volatile bool checkpoint_notified; // Coord sets and resets, W can read 351 volatile bool master_log_change_notified; // Coord sets and resets, W can read 352 ulong bitmap_shifted; // shift the last bitmap at receiving new CP 353 // WQ current excess above the overrun level 354 long wq_overrun_cnt; 355 /* 356 number of events starting from which Worker queue is regarded as 357 close to full. The number of the excessive events yields a weight factor 358 to compute Coordinator's nap. 359 */ 360 ulong overrun_level; 361 /* 362 reverse to overrun: the number of events below which Worker is 363 considered underruning 364 */ 365 ulong underrun_level; 366 /* 367 Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker. 368 When WQ length is dropped below overrun the counter is reset. 369 */ 370 ulong excess_cnt; 371 /* 372 Coordinates of the last CheckPoint (CP) this Worker has 373 acknowledged; part of is persisent data 374 */ 375 char checkpoint_relay_log_name[FN_REFLEN]; 376 ulonglong checkpoint_relay_log_pos; 377 char checkpoint_master_log_name[FN_REFLEN]; 378 ulonglong checkpoint_master_log_pos; 379 MY_BITMAP group_executed; // bitmap describes groups executed after last CP 380 MY_BITMAP group_shifted; // temporary bitmap to compute group_executed 381 ulong checkpoint_seqno; // the most significant ON bit in group_executed 382 enum en_running_state 383 { 384 NOT_RUNNING= 0, 385 RUNNING= 1, 386 ERROR_LEAVING= 2, // is set by Worker 387 STOP= 3, // is set by Coordinator upon receiving STOP 388 STOP_ACCEPTED= 4 // is set by worker upon completing job when STOP SLAVE is issued 389 }; 390 /* 391 The running status is guarded by jobs_lock mutex that a writer 392 Coordinator or Worker itself needs to hold when write a new value. 393 */ 394 en_running_state volatile running_status; 395 /* 396 exit_incremented indicates whether worker has contributed to max updated index. 397 By default it is set to false. When the worker contibutes for the first time this 398 variable is set to true. 399 */ 400 bool exit_incremented; 401 402 int init_worker(Relay_log_info*, ulong); 403 int rli_init_info(bool); 404 int flush_info(bool force= FALSE); 405 static size_t get_number_worker_fields(); 406 void slave_worker_ends_group(Log_event*, int); 407 const char *get_master_log_name(); get_master_log_pos()408 ulonglong get_master_log_pos() { return master_log_pos; }; set_master_log_pos(ulong val)409 ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; }; 410 bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force); 411 /* 412 When commit fails clear bitmap for executed worker group. Revert back the 413 positions to the old positions that existed before commit using the checkpoint. 414 415 @param Slave_job_group a pointer to Slave_job_group struct instance which 416 holds group master log pos, group relay log pos and checkpoint positions. 417 */ 418 void rollback_positions(Slave_job_group *ptr_g); 419 bool reset_recovery_info(); 420 /** 421 Different from the parent method in that this does not delete 422 rli_description_event. 423 The method runs by Coordinator when Worker are synched or being 424 destroyed. 425 */ set_rli_description_event(Format_description_log_event * fdle)426 void set_rli_description_event(Format_description_log_event *fdle) 427 { 428 DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd)); 429 #ifndef DBUG_OFF 430 if (fdle) 431 mysql_mutex_assert_owner(&jobs_lock); 432 #endif 433 434 if (fdle) 435 adapt_to_master_version(fdle); 436 rli_description_event= fdle; 437 } 438 reset_gaq_index()439 inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; }; set_gaq_index(ulong val)440 inline void set_gaq_index(ulong val) 441 { 442 if (gaq_index == c_rli->gaq->size) 443 gaq_index= val; 444 }; 445 446 protected: 447 448 virtual void do_report(loglevel level, int err_code, 449 const char *msg, va_list v_args) const; 450 451 private: 452 ulong gaq_index; // GAQ index of the current assignment 453 ulonglong master_log_pos; // event's cached log_pos for possibile error report 454 void end_info(); 455 bool read_info(Rpl_info_handler *from); 456 bool write_info(Rpl_info_handler *to); 457 Slave_worker& operator=(const Slave_worker& info); 458 Slave_worker(const Slave_worker& info); 459 }; 460 461 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret); 462 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item); 463 bool set_max_updated_index_on_stop(Slave_worker *worker, 464 Slave_job_item *job_item); 465 466 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*); 467 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*); 468 #endif // HAVE_REPLICATION 469 #endif 470