1 //
2 // Copyright (C) 2010-2013 Codership Oy
3 //
4 
5 #ifndef GALERA_MONITOR_HPP
6 #define GALERA_MONITOR_HPP
7 
8 #include "trx_handle.hpp"
9 #include <gu_lock.hpp> // for gu::Mutex and gu::Cond
10 #include <gu_limits.h>
11 
12 #include <vector>
13 
14 namespace galera
15 {
16     template <class C>
17     class Monitor
18     {
19     private:
20 
21         struct Process
22         {
Processgalera::Monitor::Process23             Process() : obj_(0), cond_(), wait_cond_(), state_(S_IDLE)
24 #ifndef NDEBUG
25                       ,dobj_()
26 #endif /* NDEBUG */
27             { }
28 
29             const C* obj_;
30             gu::Cond cond_;
31             gu::Cond wait_cond_;
32             enum State
33             {
34                 S_IDLE,     // Slot is free
35                 S_WAITING,  // Waiting to enter applying critical section
36                 S_CANCELED,
37                 S_APPLYING, // Applying
38                 S_FINISHED  // Finished
39             } state_;
40 #ifndef NDEBUG
41             C dobj_;
42 #endif /* NDEBUG */
43 
44         private:
45 
46             // non-copyable
47             Process(const Process& other);
48             void operator=(const Process&);
49         };
50 
51         static const ssize_t process_size_ = (1ULL << 16);
52         static const size_t  process_mask_ = process_size_ - 1;
53     public:
54 
Monitor()55         Monitor()
56             :
57             mutex_(),
58             cond_(),
59             uuid_(WSREP_UUID_UNDEFINED),
60             last_entered_(-1),
61             last_left_(-1),
62             drain_seqno_(GU_LLONG_MAX),
63             process_(new Process[process_size_]),
64             entered_(0),
65             oooe_(0),
66             oool_(0),
67             win_size_(0),
68             waits_(0)
69         { }
70 
~Monitor()71         ~Monitor()
72         {
73             delete[] process_;
74             if (entered_ > 0)
75             {
76                 log_info << "mon: entered " << entered_
77                          << " oooe fraction " << double(oooe_)/entered_
78                          << " oool fraction " << double(oool_)/entered_;
79             }
80             else
81             {
82                 log_info << "apply mon: entered 0";
83             }
84         }
85 
86         /*
87          * For ordered CC events this had to be changed:
88          * - it either resets position to -1 or
89          * - merely advances it to seqno if current position is behind.
90          * Assumes that monitor has been drained.
91          */
set_initial_position(const wsrep_uuid_t & uuid,wsrep_seqno_t const seqno)92         void set_initial_position(const wsrep_uuid_t& uuid,
93                                   wsrep_seqno_t const seqno)
94         {
95             gu::Lock lock(mutex_);
96 
97             state_debug_print("set_initial_position", seqno);
98             uuid_ = uuid;
99             // When the monitor position is reset, either all the
100             // waiters must have been drained or the thread which is
101             // resetting the position must hold the monitor (CC from IST).
102             // Exception is -1 which means that the monitor is being
103             // forcifully reset.
104             assert(seqno == -1 || last_entered_ == last_left_ ||
105                    last_entered_ == seqno);
106             if (last_entered_ == -1 || seqno == -1)
107             {
108                 // first call or reset
109                 last_entered_ = last_left_ = seqno;
110             }
111             else
112 #if 1 // now
113             {
114                 if (last_left_    < seqno)      last_left_    = seqno;
115                 if (last_entered_ < last_left_) last_entered_ = last_left_;
116             }
117 
118             // some drainers may wait for us here
119             cond_.broadcast();
120 #else // before
121             {
122                 // drain monitor up to seqno but don't reset last_entered_
123                 // or last_left_
124                 drain_common(seqno, lock);
125                 drain_seqno_ = GU_LLONG_MAX;
126             }
127 #endif
128             if (seqno != -1)
129             {
130                 const size_t idx(indexof(seqno));
131                 process_[idx].wait_cond_.broadcast();
132             }
133         }
134 
enter(C & obj)135         void enter(C& obj)
136         {
137             const wsrep_seqno_t obj_seqno(obj.seqno());
138             const size_t        idx(indexof(obj_seqno));
139             gu::Lock            lock(mutex_);
140 
141             state_debug_print("enter", obj);
142             assert(obj_seqno > last_left_);
143 
144             pre_enter(obj, lock);
145 
146             if (gu_likely(process_[idx].state_ != Process::S_CANCELED))
147             {
148                 assert(process_[idx].state_ == Process::S_IDLE);
149 
150                 process_[idx].state_ = Process::S_WAITING;
151                 process_[idx].obj_   = &obj;
152 #ifndef NDEBUG
153                 process_[idx].dobj_.~C();
154                 new (&process_[idx].dobj_) C(obj);
155 #endif /* NDEBUG */
156 #ifdef GU_DBUG_ON
157                 obj.debug_sync(mutex_);
158 #endif // GU_DBUG_ON
159                 while (may_enter(obj) == false &&
160                        process_[idx].state_ == Process::S_WAITING)
161                 {
162                     ++waits_;
163                     lock.wait(process_[idx].cond_);
164                 }
165 
166                 if (process_[idx].state_ != Process::S_CANCELED)
167                 {
168                     assert(process_[idx].state_ == Process::S_WAITING ||
169                            process_[idx].state_ == Process::S_APPLYING);
170 
171                     process_[idx].state_ = Process::S_APPLYING;
172 
173                     ++entered_;
174                     oooe_     += ((last_left_ + 1) < obj_seqno);
175                     win_size_ += (last_entered_ - last_left_);
176                     return;
177                 }
178             }
179 
180             assert(process_[idx].state_ == Process::S_CANCELED);
181             process_[idx].state_ = Process::S_IDLE;
182 
183             state_debug_print("enter canceled", obj);
184             gu_throw_error(EINTR);
185         }
186 
entered(const C & obj) const187         bool entered(const C& obj) const
188         {
189             return state(obj) == Process::S_APPLYING;
190         }
191 
finished(const C & obj) const192         bool finished(const C& obj) const
193         {
194             return state(obj) == Process::S_FINISHED;
195         }
196 
canceled(const C & obj) const197         bool canceled(const C& obj) const
198         {
199             return state(obj) == Process::S_CANCELED;
200         }
201 
leave(const C & obj)202         void leave(const C& obj)
203         {
204 #ifndef NDEBUG
205             size_t   idx(indexof(obj.seqno()));
206 #endif /* NDEBUG */
207             gu::Lock lock(mutex_);
208             state_debug_print("leave", obj);
209 
210             assert(process_[idx].state_ == Process::S_APPLYING ||
211                    process_[idx].state_ == Process::S_CANCELED);
212 
213             assert(process_[indexof(last_left_)].state_ == Process::S_IDLE);
214 
215             post_leave(obj.seqno(), lock);
216         }
217 
self_cancel(C & obj)218         void self_cancel(C& obj)
219         {
220             wsrep_seqno_t const obj_seqno(obj.seqno());
221             size_t   idx(indexof(obj_seqno));
222             gu::Lock lock(mutex_);
223 
224             state_debug_print("self_cancel", obj);
225 
226             assert(obj_seqno > last_left_);
227 
228             while (obj_seqno - last_left_ >= process_size_)
229                 // TODO: exit on error
230             {
231                 log_warn << "Trying to self-cancel seqno out of process "
232                          << "space: obj_seqno - last_left_ = " << obj_seqno
233                          << " - " << last_left_ << " = "
234                          << (obj_seqno - last_left_)
235                          << ", process_size_: "  << process_size_
236                          << ". Deadlock is very likely.";
237 
238                 lock.wait(cond_);
239             }
240 
241             assert(process_[idx].state_ == Process::S_IDLE ||
242                    process_[idx].state_ == Process::S_CANCELED);
243 
244 #ifndef NDEBUG
245             process_[idx].dobj_.~C();
246             new (&process_[idx].dobj_) C(obj);
247 #endif /* NDEBUG */
248 
249             if (obj_seqno > last_entered_) last_entered_ = obj_seqno;
250 
251             if (obj_seqno <= drain_seqno_)
252             {
253                 post_leave(obj.seqno(), lock);
254             }
255             else
256             {
257                 process_[idx].state_ = Process::S_FINISHED;
258             }
259         }
260 
interrupt(const C & obj)261         bool interrupt(const C& obj)
262         {
263             size_t   idx (indexof(obj.seqno()));
264             gu::Lock lock(mutex_);
265 
266             while (obj.seqno() - last_left_ >= process_size_)
267                 // TODO: exit on error
268             {
269                 lock.wait(cond_);
270             }
271 
272             state_debug_print("interrupt", obj);
273 
274             if ((process_[idx].state_ == Process::S_IDLE &&
275                  obj.seqno()          >  last_left_ ) ||
276                 process_[idx].state_ == Process::S_WAITING )
277             {
278                 process_[idx].state_ = Process::S_CANCELED;
279                 process_[idx].cond_.signal();
280                 // since last_left + 1 cannot be <= S_WAITING we're not
281                 // modifying a window here. No broadcasting.
282                 return true;
283             }
284             else
285             {
286                 log_debug << "interrupting " << obj.seqno()
287                           << " state " << process_[idx].state_
288                           << " le " << last_entered_
289                           << " ll " << last_left_;
290             }
291 
292             return false;
293         }
294 
last_left() const295         wsrep_seqno_t last_left() const
296         {
297             gu::Lock lock(mutex_);
298             return last_left_;
299         }
300 
last_entered() const301         wsrep_seqno_t last_entered() const
302         {
303             gu::Lock lock(mutex_);
304             return last_entered_;
305         }
306 
last_left_gtid(wsrep_gtid_t & gtid) const307         void last_left_gtid(wsrep_gtid_t& gtid) const
308         {
309             gu::Lock lock(mutex_);
310             gtid.uuid = uuid_;
311             gtid.seqno = last_left_;
312         }
313 
size() const314         ssize_t       size()        const { return process_size_; }
315 
would_block(wsrep_seqno_t seqno) const316         bool would_block (wsrep_seqno_t seqno) const
317         {
318             return (seqno - last_left_ >= process_size_ ||
319                     seqno > drain_seqno_);
320         }
321 
drain(wsrep_seqno_t seqno)322         void drain(wsrep_seqno_t seqno)
323         {
324             gu::Lock lock(mutex_);
325 
326             state_debug_print("drain", seqno);
327 
328             while (drain_seqno_ != GU_LLONG_MAX)
329             {
330                 lock.wait(cond_);
331             }
332 
333             drain_common(seqno, lock);
334 
335             // there can be some stale canceled entries
336             update_last_left();
337 
338             drain_seqno_ = GU_LLONG_MAX;
339             cond_.broadcast();
340         }
341 
wait(wsrep_seqno_t seqno)342         void wait(wsrep_seqno_t seqno)
343         {
344             gu::Lock lock(mutex_);
345             while (last_left_ < seqno)
346             {
347                 size_t idx(indexof(seqno));
348                 lock.wait(process_[idx].wait_cond_);
349             }
350         }
351 
wait(gu::GTID & gtid,const gu::datetime::Date & wait_until)352         void wait(gu::GTID& gtid, const gu::datetime::Date& wait_until)
353         {
354             gu::Lock lock(mutex_);
355             if (gtid.uuid() != uuid_)
356             {
357                 throw gu::NotFound();
358             }
359             while (last_left_ < gtid.seqno())
360             {
361                 size_t idx(indexof(gtid.seqno()));
362                 lock.wait(process_[idx].wait_cond_, wait_until);
363             }
364         }
365 
get_stats(double * oooe,double * oool,double * win_size,long long * waits) const366         void get_stats(double* oooe, double* oool, double* win_size,
367                        long long* waits) const
368         {
369             gu::Lock lock(mutex_);
370 
371             if (entered_ > 0)
372             {
373                 *oooe = (oooe_ > 0 ? double(oooe_)/entered_ : .0);
374                 *oool = (oool_ > 0 ? double(oool_)/entered_ : .0);
375                 *win_size = (win_size_ > 0 ? double(win_size_)/entered_ : .0);
376             }
377             else
378             {
379                 *oooe = .0; *oool = .0; *win_size = .0;
380             }
381             *waits = waits_;
382         }
383 
flush_stats()384         void flush_stats()
385         {
386             gu::Lock lock(mutex_);
387             oooe_ = 0; oool_ = 0; win_size_ = 0; entered_ = 0; waits_ = 0;
388         }
389 
390     private:
391 
392         template <typename T>
state_debug_print(const std::string & method,const T & x)393         void state_debug_print(const std::string& method, const T& x)
394         {
395 // #define GALERA_MONITOR_DEBUG_PRINT
396 #ifdef GALERA_MONITOR_DEBUG_PRINT
397             log_info << typeid(C).name() << ": " << method << "(" << x
398                      << "): le " << last_entered_ << ", ll " << last_left_;
399 #endif /* GALERA_MONITOR_DEBUG_PRINT */
400         }
401 
indexof(wsrep_seqno_t seqno) const402         size_t indexof(wsrep_seqno_t seqno) const
403         {
404             return (seqno & process_mask_);
405         }
406 
may_enter(const C & obj) const407         bool may_enter(const C& obj) const
408         {
409             return obj.condition(last_entered_, last_left_);
410         }
411 
412         // wait until it is possible to grab slot in monitor,
413         // update last entered
pre_enter(C & obj,gu::Lock & lock)414         void pre_enter(C& obj, gu::Lock& lock)
415         {
416             assert(last_left_ <= last_entered_);
417 
418             const wsrep_seqno_t obj_seqno(obj.seqno());
419 
420             while (would_block (obj_seqno)) // TODO: exit on error
421             {
422                 lock.wait(cond_);
423             }
424 
425             if (last_entered_ < obj_seqno) last_entered_ = obj_seqno;
426         }
427 
update_last_left()428         void update_last_left()
429         {
430             for (wsrep_seqno_t i = last_left_ + 1; i <= last_entered_; ++i)
431             {
432                 Process& a(process_[indexof(i)]);
433 
434                 if (Process::S_FINISHED == a.state_)
435                 {
436                     a.state_   = Process::S_IDLE;
437                     last_left_ = i;
438                     a.wait_cond_.broadcast();
439                 }
440                 else
441                 {
442                     break;
443                 }
444             }
445             assert(last_left_ <= last_entered_);
446         }
447 
wake_up_next()448         void wake_up_next()
449         {
450             for (wsrep_seqno_t i = last_left_ + 1; i <= last_entered_; ++i)
451             {
452                 Process& a(process_[indexof(i)]);
453                 if (a.state_           == Process::S_WAITING &&
454                     may_enter(*a.obj_) == true)
455                 {
456                     // We need to set state to APPLYING here because if
457                     // it is  the last_left_ + 1 and it gets canceled in
458                     // the race  that follows exit from this function,
459                     // there will be  nobody to clean up and advance
460                     // last_left_.
461                     a.state_ = Process::S_APPLYING;
462                     a.cond_.signal();
463                 }
464             }
465         }
466 
post_leave(wsrep_seqno_t const obj_seqno,gu::Lock & lock)467         void post_leave(wsrep_seqno_t const obj_seqno, gu::Lock& lock)
468         {
469             const size_t idx(indexof(obj_seqno));
470 
471             if (last_left_ + 1 == obj_seqno) // we're shrinking window
472             {
473                 process_[idx].state_ = Process::S_IDLE;
474                 last_left_           = obj_seqno;
475                 process_[idx].wait_cond_.broadcast();
476 
477                 update_last_left();
478                 oool_ += (last_left_ > obj_seqno);
479                 // wake up waiters that may remain above us (last_left_
480                 // now is max)
481                 wake_up_next();
482             }
483             else
484             {
485                 process_[idx].state_ = Process::S_FINISHED;
486             }
487 
488             process_[idx].obj_ = 0;
489 
490             assert((last_left_ >= obj_seqno &&
491                     process_[idx].state_ == Process::S_IDLE) ||
492                    process_[idx].state_ == Process::S_FINISHED);
493             assert(last_left_ != last_entered_ ||
494                    process_[indexof(last_left_)].state_ == Process::S_IDLE);
495 
496             if ((last_left_ >= obj_seqno) ||  // - occupied window shrinked
497                 (last_left_ >= drain_seqno_)) // - this is to notify drain that
498                                               //   we reached drain_seqno_
499             {
500                 cond_.broadcast();
501             }
502         }
503 
drain_common(wsrep_seqno_t seqno,gu::Lock & lock)504         void drain_common(wsrep_seqno_t seqno, gu::Lock& lock)
505         {
506             log_debug << "draining up to " << seqno;
507 
508             drain_seqno_ = seqno;
509 
510             if (last_left_ > drain_seqno_)
511             {
512                 log_warn << "last left " << last_left_
513                          << " greater than drain seqno " << drain_seqno_;
514 #ifndef NDEBUG
515                 for (wsrep_seqno_t i = drain_seqno_; i <= last_left_; ++i)
516                 {
517                     const Process& a(process_[indexof(i)]);
518                     log_info << "applier " << i
519                              << " in state " << a.state_;
520                 }
521 #endif
522             }
523 
524             while (last_left_ < drain_seqno_) lock.wait(cond_);
525         }
526 
state(const C & obj) const527         typename Process::State state(const C& obj) const
528         {
529             const wsrep_seqno_t obj_seqno(obj.seqno());
530             const size_t        idx(indexof(obj_seqno));
531             gu::Lock lock(mutex_);
532             while (would_block (obj_seqno))
533             {
534                 lock.wait(cond_);
535             }
536             return process_[idx].state_;
537         }
538 
539         Monitor(const Monitor&);
540         void operator=(const Monitor&);
541 
542         mutable
543         gu::Mutex mutex_;
544         gu::Cond  cond_;
545         wsrep_uuid_t  uuid_;
546         wsrep_seqno_t last_entered_;
547         wsrep_seqno_t last_left_;
548         wsrep_seqno_t drain_seqno_;
549         Process*      process_;
550         long entered_;  // entered
551         long oooe_;     // out of order entered
552         long oool_;     // out of order left
553         long win_size_; // window between last_left_ and last_entered_
554         // Total number of waits in the monitor. Incremented before
555         // entering into waiting state.
556         long long waits_;
557     };
558 }
559 
560 #endif // GALERA_APPLY_MONITOR_HPP
561