1 // 2 // Copyright (C) 2010-2013 Codership Oy 3 // 4 5 #ifndef GALERA_MONITOR_HPP 6 #define GALERA_MONITOR_HPP 7 8 #include "trx_handle.hpp" 9 #include <gu_lock.hpp> // for gu::Mutex and gu::Cond 10 #include <gu_limits.h> 11 12 #include <vector> 13 14 namespace galera 15 { 16 template <class C> 17 class Monitor 18 { 19 private: 20 21 struct Process 22 { Processgalera::Monitor::Process23 Process() : obj_(0), cond_(), wait_cond_(), state_(S_IDLE) 24 #ifndef NDEBUG 25 ,dobj_() 26 #endif /* NDEBUG */ 27 { } 28 29 const C* obj_; 30 gu::Cond cond_; 31 gu::Cond wait_cond_; 32 enum State 33 { 34 S_IDLE, // Slot is free 35 S_WAITING, // Waiting to enter applying critical section 36 S_CANCELED, 37 S_APPLYING, // Applying 38 S_FINISHED // Finished 39 } state_; 40 #ifndef NDEBUG 41 C dobj_; 42 #endif /* NDEBUG */ 43 44 private: 45 46 // non-copyable 47 Process(const Process& other); 48 void operator=(const Process&); 49 }; 50 51 static const ssize_t process_size_ = (1ULL << 16); 52 static const size_t process_mask_ = process_size_ - 1; 53 public: 54 Monitor()55 Monitor() 56 : 57 mutex_(), 58 cond_(), 59 uuid_(WSREP_UUID_UNDEFINED), 60 last_entered_(-1), 61 last_left_(-1), 62 drain_seqno_(GU_LLONG_MAX), 63 process_(new Process[process_size_]), 64 entered_(0), 65 oooe_(0), 66 oool_(0), 67 win_size_(0), 68 waits_(0) 69 { } 70 ~Monitor()71 ~Monitor() 72 { 73 delete[] process_; 74 if (entered_ > 0) 75 { 76 log_info << "mon: entered " << entered_ 77 << " oooe fraction " << double(oooe_)/entered_ 78 << " oool fraction " << double(oool_)/entered_; 79 } 80 else 81 { 82 log_info << "apply mon: entered 0"; 83 } 84 } 85 86 /* 87 * For ordered CC events this had to be changed: 88 * - it either resets position to -1 or 89 * - merely advances it to seqno if current position is behind. 90 * Assumes that monitor has been drained. 91 */ set_initial_position(const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno)92 void set_initial_position(const wsrep_uuid_t& uuid, 93 wsrep_seqno_t const seqno) 94 { 95 gu::Lock lock(mutex_); 96 97 state_debug_print("set_initial_position", seqno); 98 uuid_ = uuid; 99 // When the monitor position is reset, either all the 100 // waiters must have been drained or the thread which is 101 // resetting the position must hold the monitor (CC from IST). 102 // Exception is -1 which means that the monitor is being 103 // forcifully reset. 104 assert(seqno == -1 || last_entered_ == last_left_ || 105 last_entered_ == seqno); 106 if (last_entered_ == -1 || seqno == -1) 107 { 108 // first call or reset 109 last_entered_ = last_left_ = seqno; 110 } 111 else 112 #if 1 // now 113 { 114 if (last_left_ < seqno) last_left_ = seqno; 115 if (last_entered_ < last_left_) last_entered_ = last_left_; 116 } 117 118 // some drainers may wait for us here 119 cond_.broadcast(); 120 #else // before 121 { 122 // drain monitor up to seqno but don't reset last_entered_ 123 // or last_left_ 124 drain_common(seqno, lock); 125 drain_seqno_ = GU_LLONG_MAX; 126 } 127 #endif 128 if (seqno != -1) 129 { 130 const size_t idx(indexof(seqno)); 131 process_[idx].wait_cond_.broadcast(); 132 } 133 } 134 enter(C & obj)135 void enter(C& obj) 136 { 137 const wsrep_seqno_t obj_seqno(obj.seqno()); 138 const size_t idx(indexof(obj_seqno)); 139 gu::Lock lock(mutex_); 140 141 state_debug_print("enter", obj); 142 assert(obj_seqno > last_left_); 143 144 pre_enter(obj, lock); 145 146 if (gu_likely(process_[idx].state_ != Process::S_CANCELED)) 147 { 148 assert(process_[idx].state_ == Process::S_IDLE); 149 150 process_[idx].state_ = Process::S_WAITING; 151 process_[idx].obj_ = &obj; 152 #ifndef NDEBUG 153 process_[idx].dobj_.~C(); 154 new (&process_[idx].dobj_) C(obj); 155 #endif /* NDEBUG */ 156 #ifdef GU_DBUG_ON 157 obj.debug_sync(mutex_); 158 #endif // GU_DBUG_ON 159 while (may_enter(obj) == false && 160 process_[idx].state_ == Process::S_WAITING) 161 { 162 ++waits_; 163 lock.wait(process_[idx].cond_); 164 } 165 166 if (process_[idx].state_ != Process::S_CANCELED) 167 { 168 assert(process_[idx].state_ == Process::S_WAITING || 169 process_[idx].state_ == Process::S_APPLYING); 170 171 process_[idx].state_ = Process::S_APPLYING; 172 173 ++entered_; 174 oooe_ += ((last_left_ + 1) < obj_seqno); 175 win_size_ += (last_entered_ - last_left_); 176 return; 177 } 178 } 179 180 assert(process_[idx].state_ == Process::S_CANCELED); 181 process_[idx].state_ = Process::S_IDLE; 182 183 state_debug_print("enter canceled", obj); 184 gu_throw_error(EINTR); 185 } 186 entered(const C & obj) const187 bool entered(const C& obj) const 188 { 189 return state(obj) == Process::S_APPLYING; 190 } 191 finished(const C & obj) const192 bool finished(const C& obj) const 193 { 194 return state(obj) == Process::S_FINISHED; 195 } 196 canceled(const C & obj) const197 bool canceled(const C& obj) const 198 { 199 return state(obj) == Process::S_CANCELED; 200 } 201 leave(const C & obj)202 void leave(const C& obj) 203 { 204 #ifndef NDEBUG 205 size_t idx(indexof(obj.seqno())); 206 #endif /* NDEBUG */ 207 gu::Lock lock(mutex_); 208 state_debug_print("leave", obj); 209 210 assert(process_[idx].state_ == Process::S_APPLYING || 211 process_[idx].state_ == Process::S_CANCELED); 212 213 assert(process_[indexof(last_left_)].state_ == Process::S_IDLE); 214 215 post_leave(obj.seqno(), lock); 216 } 217 self_cancel(C & obj)218 void self_cancel(C& obj) 219 { 220 wsrep_seqno_t const obj_seqno(obj.seqno()); 221 size_t idx(indexof(obj_seqno)); 222 gu::Lock lock(mutex_); 223 224 state_debug_print("self_cancel", obj); 225 226 assert(obj_seqno > last_left_); 227 228 while (obj_seqno - last_left_ >= process_size_) 229 // TODO: exit on error 230 { 231 log_warn << "Trying to self-cancel seqno out of process " 232 << "space: obj_seqno - last_left_ = " << obj_seqno 233 << " - " << last_left_ << " = " 234 << (obj_seqno - last_left_) 235 << ", process_size_: " << process_size_ 236 << ". Deadlock is very likely."; 237 238 lock.wait(cond_); 239 } 240 241 assert(process_[idx].state_ == Process::S_IDLE || 242 process_[idx].state_ == Process::S_CANCELED); 243 244 #ifndef NDEBUG 245 process_[idx].dobj_.~C(); 246 new (&process_[idx].dobj_) C(obj); 247 #endif /* NDEBUG */ 248 249 if (obj_seqno > last_entered_) last_entered_ = obj_seqno; 250 251 if (obj_seqno <= drain_seqno_) 252 { 253 post_leave(obj.seqno(), lock); 254 } 255 else 256 { 257 process_[idx].state_ = Process::S_FINISHED; 258 } 259 } 260 interrupt(const C & obj)261 bool interrupt(const C& obj) 262 { 263 size_t idx (indexof(obj.seqno())); 264 gu::Lock lock(mutex_); 265 266 while (obj.seqno() - last_left_ >= process_size_) 267 // TODO: exit on error 268 { 269 lock.wait(cond_); 270 } 271 272 state_debug_print("interrupt", obj); 273 274 if ((process_[idx].state_ == Process::S_IDLE && 275 obj.seqno() > last_left_ ) || 276 process_[idx].state_ == Process::S_WAITING ) 277 { 278 process_[idx].state_ = Process::S_CANCELED; 279 process_[idx].cond_.signal(); 280 // since last_left + 1 cannot be <= S_WAITING we're not 281 // modifying a window here. No broadcasting. 282 return true; 283 } 284 else 285 { 286 log_debug << "interrupting " << obj.seqno() 287 << " state " << process_[idx].state_ 288 << " le " << last_entered_ 289 << " ll " << last_left_; 290 } 291 292 return false; 293 } 294 last_left() const295 wsrep_seqno_t last_left() const 296 { 297 gu::Lock lock(mutex_); 298 return last_left_; 299 } 300 last_entered() const301 wsrep_seqno_t last_entered() const 302 { 303 gu::Lock lock(mutex_); 304 return last_entered_; 305 } 306 last_left_gtid(wsrep_gtid_t & gtid) const307 void last_left_gtid(wsrep_gtid_t& gtid) const 308 { 309 gu::Lock lock(mutex_); 310 gtid.uuid = uuid_; 311 gtid.seqno = last_left_; 312 } 313 size() const314 ssize_t size() const { return process_size_; } 315 would_block(wsrep_seqno_t seqno) const316 bool would_block (wsrep_seqno_t seqno) const 317 { 318 return (seqno - last_left_ >= process_size_ || 319 seqno > drain_seqno_); 320 } 321 drain(wsrep_seqno_t seqno)322 void drain(wsrep_seqno_t seqno) 323 { 324 gu::Lock lock(mutex_); 325 326 state_debug_print("drain", seqno); 327 328 while (drain_seqno_ != GU_LLONG_MAX) 329 { 330 lock.wait(cond_); 331 } 332 333 drain_common(seqno, lock); 334 335 // there can be some stale canceled entries 336 update_last_left(); 337 338 drain_seqno_ = GU_LLONG_MAX; 339 cond_.broadcast(); 340 } 341 wait(wsrep_seqno_t seqno)342 void wait(wsrep_seqno_t seqno) 343 { 344 gu::Lock lock(mutex_); 345 while (last_left_ < seqno) 346 { 347 size_t idx(indexof(seqno)); 348 lock.wait(process_[idx].wait_cond_); 349 } 350 } 351 wait(gu::GTID & gtid,const gu::datetime::Date & wait_until)352 void wait(gu::GTID& gtid, const gu::datetime::Date& wait_until) 353 { 354 gu::Lock lock(mutex_); 355 if (gtid.uuid() != uuid_) 356 { 357 throw gu::NotFound(); 358 } 359 while (last_left_ < gtid.seqno()) 360 { 361 size_t idx(indexof(gtid.seqno())); 362 lock.wait(process_[idx].wait_cond_, wait_until); 363 } 364 } 365 get_stats(double * oooe,double * oool,double * win_size,long long * waits) const366 void get_stats(double* oooe, double* oool, double* win_size, 367 long long* waits) const 368 { 369 gu::Lock lock(mutex_); 370 371 if (entered_ > 0) 372 { 373 *oooe = (oooe_ > 0 ? double(oooe_)/entered_ : .0); 374 *oool = (oool_ > 0 ? double(oool_)/entered_ : .0); 375 *win_size = (win_size_ > 0 ? double(win_size_)/entered_ : .0); 376 } 377 else 378 { 379 *oooe = .0; *oool = .0; *win_size = .0; 380 } 381 *waits = waits_; 382 } 383 flush_stats()384 void flush_stats() 385 { 386 gu::Lock lock(mutex_); 387 oooe_ = 0; oool_ = 0; win_size_ = 0; entered_ = 0; waits_ = 0; 388 } 389 390 private: 391 392 template <typename T> state_debug_print(const std::string & method,const T & x)393 void state_debug_print(const std::string& method, const T& x) 394 { 395 // #define GALERA_MONITOR_DEBUG_PRINT 396 #ifdef GALERA_MONITOR_DEBUG_PRINT 397 log_info << typeid(C).name() << ": " << method << "(" << x 398 << "): le " << last_entered_ << ", ll " << last_left_; 399 #endif /* GALERA_MONITOR_DEBUG_PRINT */ 400 } 401 indexof(wsrep_seqno_t seqno) const402 size_t indexof(wsrep_seqno_t seqno) const 403 { 404 return (seqno & process_mask_); 405 } 406 may_enter(const C & obj) const407 bool may_enter(const C& obj) const 408 { 409 return obj.condition(last_entered_, last_left_); 410 } 411 412 // wait until it is possible to grab slot in monitor, 413 // update last entered pre_enter(C & obj,gu::Lock & lock)414 void pre_enter(C& obj, gu::Lock& lock) 415 { 416 assert(last_left_ <= last_entered_); 417 418 const wsrep_seqno_t obj_seqno(obj.seqno()); 419 420 while (would_block (obj_seqno)) // TODO: exit on error 421 { 422 lock.wait(cond_); 423 } 424 425 if (last_entered_ < obj_seqno) last_entered_ = obj_seqno; 426 } 427 update_last_left()428 void update_last_left() 429 { 430 for (wsrep_seqno_t i = last_left_ + 1; i <= last_entered_; ++i) 431 { 432 Process& a(process_[indexof(i)]); 433 434 if (Process::S_FINISHED == a.state_) 435 { 436 a.state_ = Process::S_IDLE; 437 last_left_ = i; 438 a.wait_cond_.broadcast(); 439 } 440 else 441 { 442 break; 443 } 444 } 445 assert(last_left_ <= last_entered_); 446 } 447 wake_up_next()448 void wake_up_next() 449 { 450 for (wsrep_seqno_t i = last_left_ + 1; i <= last_entered_; ++i) 451 { 452 Process& a(process_[indexof(i)]); 453 if (a.state_ == Process::S_WAITING && 454 may_enter(*a.obj_) == true) 455 { 456 // We need to set state to APPLYING here because if 457 // it is the last_left_ + 1 and it gets canceled in 458 // the race that follows exit from this function, 459 // there will be nobody to clean up and advance 460 // last_left_. 461 a.state_ = Process::S_APPLYING; 462 a.cond_.signal(); 463 } 464 } 465 } 466 post_leave(wsrep_seqno_t const obj_seqno,gu::Lock & lock)467 void post_leave(wsrep_seqno_t const obj_seqno, gu::Lock& lock) 468 { 469 const size_t idx(indexof(obj_seqno)); 470 471 if (last_left_ + 1 == obj_seqno) // we're shrinking window 472 { 473 process_[idx].state_ = Process::S_IDLE; 474 last_left_ = obj_seqno; 475 process_[idx].wait_cond_.broadcast(); 476 477 update_last_left(); 478 oool_ += (last_left_ > obj_seqno); 479 // wake up waiters that may remain above us (last_left_ 480 // now is max) 481 wake_up_next(); 482 } 483 else 484 { 485 process_[idx].state_ = Process::S_FINISHED; 486 } 487 488 process_[idx].obj_ = 0; 489 490 assert((last_left_ >= obj_seqno && 491 process_[idx].state_ == Process::S_IDLE) || 492 process_[idx].state_ == Process::S_FINISHED); 493 assert(last_left_ != last_entered_ || 494 process_[indexof(last_left_)].state_ == Process::S_IDLE); 495 496 if ((last_left_ >= obj_seqno) || // - occupied window shrinked 497 (last_left_ >= drain_seqno_)) // - this is to notify drain that 498 // we reached drain_seqno_ 499 { 500 cond_.broadcast(); 501 } 502 } 503 drain_common(wsrep_seqno_t seqno,gu::Lock & lock)504 void drain_common(wsrep_seqno_t seqno, gu::Lock& lock) 505 { 506 log_debug << "draining up to " << seqno; 507 508 drain_seqno_ = seqno; 509 510 if (last_left_ > drain_seqno_) 511 { 512 log_warn << "last left " << last_left_ 513 << " greater than drain seqno " << drain_seqno_; 514 #ifndef NDEBUG 515 for (wsrep_seqno_t i = drain_seqno_; i <= last_left_; ++i) 516 { 517 const Process& a(process_[indexof(i)]); 518 log_info << "applier " << i 519 << " in state " << a.state_; 520 } 521 #endif 522 } 523 524 while (last_left_ < drain_seqno_) lock.wait(cond_); 525 } 526 state(const C & obj) const527 typename Process::State state(const C& obj) const 528 { 529 const wsrep_seqno_t obj_seqno(obj.seqno()); 530 const size_t idx(indexof(obj_seqno)); 531 gu::Lock lock(mutex_); 532 while (would_block (obj_seqno)) 533 { 534 lock.wait(cond_); 535 } 536 return process_[idx].state_; 537 } 538 539 Monitor(const Monitor&); 540 void operator=(const Monitor&); 541 542 mutable 543 gu::Mutex mutex_; 544 gu::Cond cond_; 545 wsrep_uuid_t uuid_; 546 wsrep_seqno_t last_entered_; 547 wsrep_seqno_t last_left_; 548 wsrep_seqno_t drain_seqno_; 549 Process* process_; 550 long entered_; // entered 551 long oooe_; // out of order entered 552 long oool_; // out of order left 553 long win_size_; // window between last_left_ and last_entered_ 554 // Total number of waits in the monitor. Incremented before 555 // entering into waiting state. 556 long long waits_; 557 }; 558 } 559 560 #endif // GALERA_APPLY_MONITOR_HPP 561