1 /* Copyright (c) 2011, 2015, Oracle and/or its affiliates. All rights reserved. 2 3 This program is free software; you can redistribute it and/or modify 4 it under the terms of the GNU General Public License, version 2.0, 5 as published by the Free Software Foundation. 6 7 This program is also distributed with certain software (including 8 but not limited to OpenSSL) that is licensed under separate terms, 9 as designated in a particular file or component or in included license 10 documentation. The authors of MySQL hereby grant you an additional 11 permission to link the program and your derivative works with the 12 separately licensed software that they have included with MySQL. 13 14 This program is distributed in the hope that it will be useful, 15 but WITHOUT ANY WARRANTY; without even the implied warranty of 16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17 GNU General Public License, version 2.0, for more details. 18 19 You should have received a copy of the GNU General Public License 20 along with this program; if not, write to the Free Software 21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ 22 23 #ifndef RPL_RLI_PDB_H 24 25 #define RPL_RLI_PDB_H 26 27 #ifdef HAVE_REPLICATION 28 29 #include "sql_string.h" 30 #include "rpl_rli.h" 31 #include <my_sys.h> 32 #include <my_bitmap.h> 33 #include "rpl_slave.h" 34 35 /** 36 Legends running throughout the module: 37 38 C - Coordinator 39 CP - checkpoint 40 W - Worker 41 42 B-event event that Begins a group (a transaction) 43 T-event event that Terminates a group (a transaction) 44 */ 45 46 /* Assigned Partition Hash (APH) entry */ 47 typedef struct st_db_worker_hash_entry 48 { 49 uint db_len; 50 const char *db; 51 Slave_worker *worker; 52 /* 53 The number of transaction pending on this database. 54 This should only be modified under the lock slave_worker_hash_lock. 55 */ 56 long usage; 57 /* 58 The list of temp tables belonging to @ db database is 59 attached to an assigned @c worker to become its thd->temporary_tables. 60 The list is updated with every ddl incl CREATE, DROP. 61 It is removed from the entry and merged to the coordinator's 62 thd->temporary_tables in case of events: slave stops, APH oversize. 63 */ 64 TABLE* volatile temporary_tables; 65 66 /* todo: relax concurrency to mimic record-level locking. 67 That is to augmenting the entry with mutex/cond pair 68 pthread_mutex_t 69 pthread_cond_t 70 timestamp updated_at; */ 71 72 } db_worker_hash_entry; 73 74 bool init_hash_workers(ulong slave_parallel_workers); 75 void destroy_hash_workers(Relay_log_info*); 76 Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, 77 db_worker_hash_entry **ptr_entry, 78 bool need_temp_tables, Slave_worker *w); 79 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers); 80 int wait_for_workers_to_finish(Relay_log_info const *rli, 81 Slave_worker *ignore= NULL); 82 83 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray 84 85 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 86 87 typedef struct slave_job_item 88 { 89 void *data; 90 } Slave_job_item; 91 92 /** 93 The class defines a type of queue with a predefined max size that is 94 implemented using the circular memory buffer. 95 That is items of the queue are accessed as indexed elements of 96 the array buffer in a way that when the index value reaches 97 a max value it wraps around to point to the first buffer element. 98 */ 99 class circular_buffer_queue 100 { 101 public: 102 103 DYNAMIC_ARRAY Q; 104 ulong size; // the Size of the queue in terms of element 105 ulong avail; // first Available index to append at (next to tail) 106 ulong entry; // the head index or the entry point to the queue. 107 volatile ulong len; // actual length 108 bool inited_queue; 109 110 circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) : size(max)111 size(max), avail(0), entry(max), len(0), inited_queue(FALSE) 112 { 113 DBUG_ASSERT(size < (ulong) -1); 114 if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc)) 115 inited_queue= TRUE; 116 } circular_buffer_queue()117 circular_buffer_queue () : inited_queue(FALSE) {} ~circular_buffer_queue()118 ~circular_buffer_queue () 119 { 120 if (inited_queue) 121 delete_dynamic(&Q); 122 } 123 124 /** 125 Content of the being dequeued item is copied to the arg-pointer 126 location. 127 128 @return the queue's array index that the de-queued item 129 located at, or 130 an error encoded in beyond the index legacy range. 131 */ 132 ulong de_queue(uchar *); 133 /** 134 Similar to de_queue but extracting happens from the tail side. 135 */ 136 ulong de_tail(uchar *val); 137 138 /** 139 return the index where the arg item locates 140 or an error encoded as a value in beyond of the legacy range 141 [0, size) (value `size' is excluded). 142 */ 143 ulong en_queue(void *item); 144 /** 145 return the value of @c data member of the head of the queue. 146 */ 147 void* head_queue(); 148 bool gt(ulong i, ulong k); // comparision of ordering of two entities 149 /* index is within the valid range */ in(ulong k)150 bool in(ulong k) { return !empty() && 151 (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); } empty()152 bool empty() { return entry == size; } full()153 bool full() { return avail == size; } 154 }; 155 156 typedef struct st_slave_job_group 157 { 158 char *group_master_log_name; // (actually redundant) 159 /* 160 T-event lop_pos filled by Worker for CheckPoint (CP) 161 */ 162 my_off_t group_master_log_pos; 163 164 /* 165 When relay-log name changes allocates and fill in a new name of relay-log, 166 otherwise it fills in NULL. 167 Coordinator keeps track of each Worker has been notified on the updating 168 to make sure the routine runs once per change. 169 170 W checks the value at commit and memoriezes a not-NULL. 171 Freeing unless NULL is left to Coordinator at CP. 172 */ 173 char *group_relay_log_name; // The value is last seen relay-log 174 my_off_t group_relay_log_pos; // filled by W 175 ulong worker_id; 176 Slave_worker *worker; 177 ulonglong total_seqno; 178 179 my_off_t master_log_pos; // B-event log_pos 180 /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */ 181 uint checkpoint_seqno; 182 my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint 183 char* checkpoint_log_name; 184 my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint 185 char* checkpoint_relay_log_name; 186 volatile uchar done; // Flag raised by W, read and reset by Coordinator 187 ulong shifted; // shift the last CP bitmap at receiving a new CP 188 time_t ts; // Group's timestampt to update Seconds_behind_master 189 #ifndef DBUG_OFF 190 bool notified; // to debug group_master_log_name change notification 191 #endif 192 /* 193 Coordinator fills the struct with defaults and options at starting of 194 a group distribution. 195 */ resetst_slave_job_group196 void reset(my_off_t master_pos, ulonglong seqno) 197 { 198 master_log_pos= master_pos; 199 group_master_log_pos= group_relay_log_pos= 0; 200 group_master_log_name= NULL; // todo: remove 201 group_relay_log_name= NULL; 202 worker_id= MTS_WORKER_UNDEF; 203 total_seqno= seqno; 204 checkpoint_log_name= NULL; 205 checkpoint_log_pos= 0; 206 checkpoint_relay_log_name= NULL; 207 checkpoint_relay_log_pos= 0; 208 checkpoint_seqno= (uint) -1; 209 done= 0; 210 ts= 0; 211 #ifndef DBUG_OFF 212 notified= false; 213 #endif 214 } 215 } Slave_job_group; 216 217 /** 218 Group Assigned Queue whose first element identifies first gap 219 in committed sequence. The head of the queue is therefore next to 220 the low-water-mark. 221 */ 222 class Slave_committed_queue : public circular_buffer_queue 223 { 224 public: 225 226 bool inited; 227 228 /* master's Rot-ev exec */ 229 void update_current_binlog(const char *post_rotate); 230 231 /* 232 The last checkpoint time Low-Water-Mark 233 */ 234 Slave_job_group lwm; 235 236 /* last time processed indexes for each worker */ 237 DYNAMIC_ARRAY last_done; 238 239 /* the being assigned group index in GAQ */ 240 ulong assigned_group_index; 241 242 Slave_committed_queue (const char *log, uint el_size, ulong max, uint n, 243 uint inc= 0) circular_buffer_queue(el_size,max,inc)244 : circular_buffer_queue(el_size, max, inc), inited(FALSE) 245 { 246 uint k; 247 ulonglong l= 0; 248 249 if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue) 250 return; 251 else 252 inited= TRUE; 253 my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0); 254 for (k= 0; k < n; k++) 255 insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker 256 lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0)); 257 lwm.group_relay_log_name[0]= 0; 258 } 259 ~Slave_committed_queue()260 ~Slave_committed_queue () 261 { 262 if (inited) 263 { 264 delete_dynamic(&last_done); 265 my_free(lwm.group_relay_log_name); 266 free_dynamic_items(); // free possibly left allocated strings in GAQ list 267 } 268 } 269 270 #ifndef DBUG_OFF 271 bool count_done(Relay_log_info* rli); 272 #endif 273 274 /* Checkpoint routine refreshes the queue */ 275 ulong move_queue_head(DYNAMIC_ARRAY *ws); 276 /* Method is for slave shutdown time cleanup */ 277 void free_dynamic_items(); 278 /* 279 returns a pointer to Slave_job_group struct instance as indexed by arg 280 in the circular buffer dyn-array 281 */ get_job_group(ulong ind)282 Slave_job_group* get_job_group(ulong ind) 283 { 284 return (Slave_job_group*) dynamic_array_ptr(&Q, ind); 285 } 286 287 /** 288 Assignes @c assigned_group_index to an index of enqueued item 289 and returns it. 290 */ en_queue(void * item)291 ulong en_queue(void *item) 292 { 293 return assigned_group_index= circular_buffer_queue::en_queue(item); 294 } 295 296 }; 297 298 class Slave_jobs_queue : public circular_buffer_queue 299 { 300 public: 301 302 /* 303 Coordinator marks with true, Worker signals back at queue back to 304 available 305 */ 306 bool overfill; 307 ulonglong waited_overfill; 308 }; 309 310 class Slave_worker : public Relay_log_info 311 { 312 public: 313 Slave_worker(Relay_log_info *rli 314 #ifdef HAVE_PSI_INTERFACE 315 ,PSI_mutex_key *param_key_info_run_lock, 316 PSI_mutex_key *param_key_info_data_lock, 317 PSI_mutex_key *param_key_info_sleep_lock, 318 PSI_mutex_key *param_key_info_data_cond, 319 PSI_mutex_key *param_key_info_start_cond, 320 PSI_mutex_key *param_key_info_stop_cond, 321 PSI_mutex_key *param_key_info_sleep_cond 322 #endif 323 , uint param_id 324 ); 325 virtual ~Slave_worker(); 326 327 Slave_jobs_queue jobs; // assignment queue containing events to execute 328 mysql_mutex_t jobs_lock; // mutex for the jobs queue 329 mysql_cond_t jobs_cond; // condition variable for the jobs queue 330 Relay_log_info *c_rli; // pointer to Coordinator's rli 331 DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions 332 bool curr_group_seen_begin; // is set to TRUE with explicit B-event 333 ulong id; // numberic identifier of the Worker 334 335 /* 336 Worker runtime statictics 337 */ 338 // the index in GAQ of the last processed group by this Worker 339 volatile ulong last_group_done_index; 340 ulonglong last_groups_assigned_index; // index of previous group assigned to worker 341 ulong wq_empty_waits; // how many times got idle 342 ulong events_done; // how many events (statements) processed 343 ulong groups_done; // how many groups (transactions) processed 344 volatile int curr_jobs; // number of active assignments 345 // number of partitions allocated to the worker at point in time 346 long usage_partition; 347 // symmetric to rli->mts_end_group_sets_max_dbs 348 bool end_group_sets_max_dbs; 349 350 volatile bool relay_log_change_notified; // Coord sets and resets, W can read 351 volatile bool checkpoint_notified; // Coord sets and resets, W can read 352 volatile bool master_log_change_notified; // Coord sets and resets, W can read 353 ulong bitmap_shifted; // shift the last bitmap at receiving new CP 354 // WQ current excess above the overrun level 355 long wq_overrun_cnt; 356 /* 357 number of events starting from which Worker queue is regarded as 358 close to full. The number of the excessive events yields a weight factor 359 to compute Coordinator's nap. 360 */ 361 ulong overrun_level; 362 /* 363 reverse to overrun: the number of events below which Worker is 364 considered underruning 365 */ 366 ulong underrun_level; 367 /* 368 Total of increments done to rli->mts_wq_excess_cnt on behalf of this worker. 369 When WQ length is dropped below overrun the counter is reset. 370 */ 371 ulong excess_cnt; 372 /* 373 Coordinates of the last CheckPoint (CP) this Worker has 374 acknowledged; part of is persisent data 375 */ 376 char checkpoint_relay_log_name[FN_REFLEN]; 377 ulonglong checkpoint_relay_log_pos; 378 char checkpoint_master_log_name[FN_REFLEN]; 379 ulonglong checkpoint_master_log_pos; 380 MY_BITMAP group_executed; // bitmap describes groups executed after last CP 381 MY_BITMAP group_shifted; // temporary bitmap to compute group_executed 382 ulong checkpoint_seqno; // the most significant ON bit in group_executed 383 enum en_running_state 384 { 385 NOT_RUNNING= 0, 386 RUNNING= 1, 387 ERROR_LEAVING= 2, // is set by Worker 388 STOP= 3, // is set by Coordinator upon receiving STOP 389 STOP_ACCEPTED= 4 // is set by worker upon completing job when STOP SLAVE is issued 390 }; 391 /* 392 The running status is guarded by jobs_lock mutex that a writer 393 Coordinator or Worker itself needs to hold when write a new value. 394 */ 395 en_running_state volatile running_status; 396 /* 397 exit_incremented indicates whether worker has contributed to max updated index. 398 By default it is set to false. When the worker contibutes for the first time this 399 variable is set to true. 400 */ 401 bool exit_incremented; 402 403 int init_worker(Relay_log_info*, ulong); 404 int rli_init_info(bool); 405 int flush_info(bool force= FALSE); 406 static size_t get_number_worker_fields(); 407 void slave_worker_ends_group(Log_event*, int); 408 const char *get_master_log_name(); get_master_log_pos()409 ulonglong get_master_log_pos() { return master_log_pos; }; set_master_log_pos(ulong val)410 ulonglong set_master_log_pos(ulong val) { return master_log_pos= val; }; 411 bool commit_positions(Log_event *evt, Slave_job_group *ptr_g, bool force); 412 /* 413 When commit fails clear bitmap for executed worker group. Revert back the 414 positions to the old positions that existed before commit using the checkpoint. 415 416 @param Slave_job_group a pointer to Slave_job_group struct instance which 417 holds group master log pos, group relay log pos and checkpoint positions. 418 */ 419 void rollback_positions(Slave_job_group *ptr_g); 420 bool reset_recovery_info(); 421 /** 422 Different from the parent method in that this does not delete 423 rli_description_event. 424 The method runs by Coordinator when Worker are synched or being 425 destroyed. 426 */ set_rli_description_event(Format_description_log_event * fdle)427 void set_rli_description_event(Format_description_log_event *fdle) 428 { 429 DBUG_ASSERT(!fdle || (running_status == Slave_worker::RUNNING && info_thd)); 430 #ifndef DBUG_OFF 431 if (fdle) 432 mysql_mutex_assert_owner(&jobs_lock); 433 #endif 434 435 if (fdle) 436 adapt_to_master_version(fdle); 437 rli_description_event= fdle; 438 } 439 reset_gaq_index()440 inline void reset_gaq_index() { gaq_index= c_rli->gaq->size; }; set_gaq_index(ulong val)441 inline void set_gaq_index(ulong val) 442 { 443 if (gaq_index == c_rli->gaq->size) 444 gaq_index= val; 445 }; 446 447 // overridden new and delete operators for 64 byte alignment 448 static void* operator new(size_t request); 449 static void operator delete(void * ptr); 450 451 protected: 452 453 virtual void do_report(loglevel level, int err_code, 454 const char *msg, va_list v_args) const; 455 456 private: 457 ulong gaq_index; // GAQ index of the current assignment 458 ulonglong master_log_pos; // event's cached log_pos for possibile error report 459 void end_info(); 460 bool read_info(Rpl_info_handler *from); 461 bool write_info(Rpl_info_handler *to); 462 Slave_worker& operator=(const Slave_worker& info); 463 Slave_worker(const Slave_worker& info); 464 }; 465 466 void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret); 467 bool handle_slave_worker_stop(Slave_worker *worker, Slave_job_item *job_item); 468 bool set_max_updated_index_on_stop(Slave_worker *worker, 469 Slave_job_item *job_item); 470 471 TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*); 472 TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*); 473 #endif // HAVE_REPLICATION 474 #endif 475