1 //
2 // Copyright (C) 2015 Codership Oy <info@codership.com>
3 //
4 
5 #ifndef GALERA_NBO_HPP
6 #define GALERA_NBO_HPP
7 
8 #include "galera_view.hpp"
9 
10 #include "gu_buffer.hpp"
11 #include "gu_serialize.hpp"
12 #include "gu_logger.hpp"
13 #include "gu_lock.hpp"
14 
15 #include "trx_handle.hpp"
16 
17 #include "wsrep_api.h"
18 
19 #include <map>
20 
21 namespace galera
22 {
23     class TrxHandleSlave;
24     class MappedBuffer;
25 
26     // Helper datatypes for NBO
27 
28     // Context to be shared between cert NBOEntry and TrxHandleSlave
29     // to signal ending of NBO.
30     class NBOCtx
31     {
32     public:
NBOCtx()33         NBOCtx()
34             :
35             mutex_(),
36             cond_ (),
37             ts_   (),
38             aborted_(false)
39         { }
40 
41 
set_ts(const TrxHandleSlavePtr & ts)42         void set_ts(const TrxHandleSlavePtr& ts)
43         {
44             gu::Lock lock(mutex_);
45             assert(ts != 0);
46             assert(ts->global_seqno() != WSREP_SEQNO_UNDEFINED);
47             ts_ = ts;
48             cond_.broadcast();
49         }
50 
seqno() const51         wsrep_seqno_t seqno() const
52         {
53             gu::Lock lock(mutex_);
54             return (ts_ == 0 ? WSREP_SEQNO_UNDEFINED : ts_->global_seqno());
55         }
56 
wait_ts()57         TrxHandleSlavePtr wait_ts()
58         {
59             gu::Lock lock(mutex_);
60             while (ts_ == 0)
61             {
62                 try
63                 {
64                     lock.wait(cond_, gu::datetime::Date::calendar()
65                               + gu::datetime::Sec);
66                 }
67                 catch (const gu::Exception& e)
68                 {
69                     if (e.get_errno() == ETIMEDOUT)
70                     {
71                         return TrxHandleSlavePtr();
72                     }
73                     throw;
74                 }
75             }
76             return ts_;
77         }
78 
set_aborted(bool val)79         void set_aborted(bool val)
80         {
81             gu::Lock lock(mutex_);
82             aborted_= val;
83             cond_.broadcast();
84         }
85 
aborted() const86         bool aborted() const
87         {
88             gu::Lock lock(mutex_);
89             return aborted_;
90         }
91 
92     private:
93         NBOCtx(const NBOCtx&);
94         NBOCtx& operator=(const NBOCtx&);
95 
96         gu::Mutex         mutex_;
97         gu::Cond          cond_;
98         TrxHandleSlavePtr ts_;
99         bool              aborted_;
100     };
101 
102     // Key for NBOMap
103     class NBOKey
104     {
105     public:
NBOKey()106         NBOKey() : seqno_(WSREP_SEQNO_UNDEFINED) { }
107 
NBOKey(const wsrep_seqno_t seqno)108         NBOKey(const wsrep_seqno_t seqno)
109             :
110             seqno_(seqno)
111         { }
112 
seqno() const113         wsrep_seqno_t seqno() const { return seqno_; }
114 
115 
operator <(const NBOKey & other) const116         bool operator<(const NBOKey& other) const
117         {
118             return (seqno_ < other.seqno_);
119         }
120 
serialize(gu::byte_t * buf,size_t buf_len,size_t offset)121         size_t serialize(gu::byte_t* buf, size_t buf_len, size_t offset)
122         {
123             return gu::serialize8(seqno_, buf, buf_len, offset);
124         }
125 
unserialize(const gu::byte_t * buf,size_t buf_len,size_t offset)126         size_t unserialize(const gu::byte_t* buf, size_t buf_len, size_t offset)
127         {
128             return gu::unserialize8(buf, buf_len, offset, seqno_);
129         }
serial_size()130         static size_t serial_size()
131         {
132             return 8; //gu::serial_size8(wsrep_seqno_t());
133         }
134 
135     private:
136         wsrep_seqno_t seqno_;
137     };
138 
139 
140     // Entry for NBOMap
141     class NBOEntry
142     {
143     public:
NBOEntry(gu::shared_ptr<TrxHandleSlave>::type ts,gu::shared_ptr<MappedBuffer>::type buf,gu::shared_ptr<NBOCtx>::type nbo_ctx)144         NBOEntry(
145             gu::shared_ptr<TrxHandleSlave>::type ts,
146             gu::shared_ptr<MappedBuffer>::type buf,
147             gu::shared_ptr<NBOCtx>::type nbo_ctx)
148             :
149             ts_ (ts),
150             buf_(buf),
151             ended_set_(),
152             nbo_ctx_(nbo_ctx)
153         { }
ts_ptr()154         TrxHandleSlave* ts_ptr() { return ts_.get(); }
155         // const TrxHandleSlave* ts_ptr() const { return ts_.get(); }
add_ended(const wsrep_uuid_t & uuid)156         void add_ended(const wsrep_uuid_t& uuid)
157         {
158             std::pair<View::MembSet::iterator, bool> ret(
159                 ended_set_.insert(uuid));
160             if (ret.second == false)
161             {
162                 log_warn << "duplicate entry "
163                          << uuid << " for ended set";
164             }
165         }
166 
clear_ended()167         void clear_ended()
168         {
169             ended_set_.clear();
170         }
ended_set() const171         const View::MembSet& ended_set() const { return ended_set_; }
end(const TrxHandleSlavePtr & ts)172         void end(const TrxHandleSlavePtr& ts)
173         {
174             assert(ts != 0);
175             nbo_ctx_->set_ts(ts);
176         }
nbo_ctx()177         gu::shared_ptr<NBOCtx>::type nbo_ctx() { return nbo_ctx_; }
178 
179     private:
180         gu::shared_ptr<TrxHandleSlave>::type ts_;
181         gu::shared_ptr<MappedBuffer>::type   buf_;
182         View::MembSet                        ended_set_;
183         gu::shared_ptr<NBOCtx>::type         nbo_ctx_;
184     };
185 
186     typedef std::map<NBOKey, gu::shared_ptr<NBOCtx>::type> NBOCtxMap;
187     typedef std::map<NBOKey, NBOEntry> NBOMap;
188 }
189 
190 
191 #endif // !GALERA_NBO_HPP
192