1 //
2 // Copyright (C) 2010-2018 Codership Oy <info@codership.com>
3 //
4 
5 #ifndef GALERA_CERTIFICATION_HPP
6 #define GALERA_CERTIFICATION_HPP
7 
8 
9 #include "nbo.hpp"
10 #include "trx_handle.hpp"
11 #include "key_entry_ng.hpp"
12 #include "galera_service_thd.hpp"
13 #include "galera_view.hpp"
14 
15 #include <gu_shared_ptr.hpp>
16 #include <gu_unordered.hpp>
17 #include <gu_lock.hpp>
18 #include <gu_config.hpp>
19 #include <gu_gtid.hpp>
20 
21 #include <map>
22 #include <list>
23 
24 namespace galera
25 {
26     class Certification
27     {
28     public:
29 
30         static std::string const PARAM_LOG_CONFLICTS;
31         static std::string const PARAM_OPTIMISTIC_PA;
32 
33         static void register_params(gu::Config&);
34 
35         typedef gu::UnorderedSet<KeyEntryOS*,
36                                  KeyEntryPtrHash, KeyEntryPtrEqual> CertIndex;
37 
38         typedef gu::UnorderedSet<KeyEntryNG*,
39                                  KeyEntryPtrHashNG, KeyEntryPtrEqualNG>
40         CertIndexNG;
41 
42         typedef gu::UnorderedMultiset<KeyEntryNG*,
43                                       KeyEntryPtrHashNG, KeyEntryPtrEqualNG>
44         CertIndexNBO;
45 
46     private:
47 
48         typedef std::multiset<wsrep_seqno_t>             DepsSet;
49 
50         typedef std::map<wsrep_seqno_t, TrxHandleSlavePtr> TrxMap;
51 
52     public:
53 
54         typedef enum
55         {
56             TEST_OK,
57             TEST_FAILED
58         } TestResult;
59 
60         Certification(gu::Config& conf, ServiceThd* thd);
61         ~Certification();
62 
63         void assign_initial_position(const gu::GTID& gtid, int version);
64         TestResult append_trx(const TrxHandleSlavePtr&);
65         TestResult test(const TrxHandleSlavePtr&, bool store_keys);
position() const66         wsrep_seqno_t position() const { return position_; }
67         wsrep_seqno_t increment_position(); /* for dummy IST events */
68 
69         /* this is for configuration change use */
70         void adjust_position(const View&, const gu::GTID& gtid, int version);
71 
72         wsrep_seqno_t
get_safe_to_discard_seqno() const73         get_safe_to_discard_seqno() const
74         {
75             gu::Lock lock(mutex_);
76             return get_safe_to_discard_seqno_();
77         }
78 
79         wsrep_seqno_t
purge_trxs_upto(wsrep_seqno_t const seqno,bool const handle_gcache)80         purge_trxs_upto(wsrep_seqno_t const seqno, bool const handle_gcache)
81         {
82             gu::Lock lock(mutex_);
83             const wsrep_seqno_t stds(get_safe_to_discard_seqno_());
84             // assert(seqno <= get_safe_to_discard_seqno());
85             // Note: setting trx committed is not done in total order so
86             // safe to discard seqno may decrease. Enable assertion above when
87             // this issue is fixed.
88             return purge_trxs_upto_(std::min(seqno, stds), handle_gcache);
89         }
90 
91         // Set trx corresponding to handle committed. Return purge seqno if
92         // index purge is required, -1 otherwise.
93         wsrep_seqno_t set_trx_committed(TrxHandleSlave&);
94 
95         // statistics section
stats_get(double & avg_cert_interval,double & avg_deps_dist,size_t & index_size) const96         void stats_get(double& avg_cert_interval,
97                        double& avg_deps_dist,
98                        size_t& index_size) const
99         {
100             gu::Lock lock(stats_mutex_);
101             avg_cert_interval = 0;
102             avg_deps_dist = 0;
103             if (n_certified_)
104             {
105                 avg_cert_interval = double(cert_interval_) / n_certified_;
106                 avg_deps_dist = double(deps_dist_) / n_certified_;
107             }
108             index_size = index_size_;
109         }
110 
stats_reset()111         void stats_reset()
112         {
113             gu::Lock lock(stats_mutex_);
114             cert_interval_ = 0;
115             deps_dist_ = 0;
116             n_certified_ = 0;
117             index_size_ = 0;
118         }
119 
120         void param_set(const std::string& key, const std::string& value);
121 
lowest_trx_seqno() const122         wsrep_seqno_t lowest_trx_seqno() const
123         {
124             return (trx_map_.empty() ? position_ : trx_map_.begin()->first);
125         }
126 
127         //
128         // NBO context lifecycle:
129         // * Context is created when NBO-start event is received
130         // * Context stays in nbo_ctx_map_ until client calls erase_nbo_ctx()
131         //
132 
133         // Get NBO context matching to global seqno
134         gu::shared_ptr<NBOCtx>::type nbo_ctx(wsrep_seqno_t);
135         // Erase NBO context entry
136         void erase_nbo_ctx(wsrep_seqno_t);
nbo_size() const137         size_t nbo_size() const { return nbo_map_.size(); }
138 
139         void mark_inconsistent();
is_inconsistent() const140         bool is_inconsistent() const { return inconsistent_; }
141 
142     private:
143 
144         // Non-copyable
145         Certification(const Certification&);
146         Certification& operator=(const Certification&);
147 
148         TestResult do_test(const TrxHandleSlavePtr&, bool store_keys);
149         TestResult do_test_v3to5(TrxHandleSlave*, bool);
150         TestResult do_test_preordered(TrxHandleSlave*);
151         TestResult do_test_nbo(const TrxHandleSlavePtr&);
152         void purge_for_trx(TrxHandleSlave*);
153 
154         // unprotected variants for internal use
155         wsrep_seqno_t get_safe_to_discard_seqno_() const;
156         wsrep_seqno_t purge_trxs_upto_(wsrep_seqno_t, bool sync);
157 
158         gu::shared_ptr<NBOCtx>::type nbo_ctx_unlocked(wsrep_seqno_t);
159 
index_purge_required()160         bool index_purge_required()
161         {
162             static unsigned int const KEYS_THRESHOLD (1   << 10); // 1K
163             static unsigned int const BYTES_THRESHOLD(128 << 20); // 128M
164             static unsigned int const TRXS_THRESHOLD (127);
165 
166             /* if either key count, byte count or trx count exceed their
167              * threshold, zero up counts and return true. */
168             return ((key_count_  > KEYS_THRESHOLD  ||
169                      byte_count_ > BYTES_THRESHOLD ||
170                      trx_count_  > TRXS_THRESHOLD)
171                      &&
172                      (key_count_ = 0, byte_count_ = 0, trx_count_ = 0, true));
173         }
174 
175         class PurgeAndDiscard
176         {
177         public:
178 
PurgeAndDiscard(Certification & cert)179             PurgeAndDiscard(Certification& cert) : cert_(cert) { }
180 
operator ()(TrxMap::value_type & vt) const181             void operator()(TrxMap::value_type& vt) const
182             {
183                 {
184                     TrxHandleSlave* trx(vt.second.get());
185                     // Trying to lock trx mutex here may cause deadlock
186                     // with streaming replication. Locking can be skipped
187                     // because trx is only read here and refcount uses atomics.
188                     // Memory barrier is provided by certification mutex.
189                     //
190                     // TrxHandleLock   lock(*trx);
191 
192                     if (!cert_.is_inconsistent())
193                     {
194                         assert(trx->is_committed() == true);
195                         if (trx->is_committed() == false)
196                         {
197                             log_warn <<"trx not committed in purge and discard: "
198                                      << *trx;
199                         }
200                     }
201 
202                     // If depends seqno is not WSREP_SEQNO_UNDEFINED
203                     // write set certification has passed and keys have been
204                     // inserted into index and purge is needed.
205                     // TOI write sets will always pass regular certification
206                     // and keys will be inserted, however if they fail
207                     // NBO certification depends seqno is set to
208                     // WSREP_SEQNO_UNDEFINED. Therefore purge should always
209                     // be done for TOI write sets.
210                     if (trx->is_dummy() == false || trx->is_toi() == true)
211                     {
212                         cert_.purge_for_trx(trx);
213                     }
214                 }
215             }
216 
PurgeAndDiscard(const PurgeAndDiscard & other)217             PurgeAndDiscard(const PurgeAndDiscard& other) : cert_(other.cert_)
218             { }
219 
220         private:
221 
222             void operator=(const PurgeAndDiscard&);
223             Certification& cert_;
224         };
225 
226         int           version_;
227         gu::Config&   conf_;
228         TrxMap        trx_map_;
229         CertIndexNG   cert_index_ng_;
230         NBOMap        nbo_map_;
231         NBOCtxMap     nbo_ctx_map_;
232         CertIndexNBO  nbo_index_;
233         TrxHandleSlave::Pool nbo_pool_;
234         DepsSet       deps_set_;
235         View          current_view_;
236         ServiceThd*   service_thd_;
237         gu::Mutex     mutex_;
238         size_t        trx_size_warn_count_;
239         wsrep_seqno_t initial_position_;
240         wsrep_seqno_t position_;
241         wsrep_seqno_t nbo_position_;
242         wsrep_seqno_t safe_to_discard_seqno_;
243         wsrep_seqno_t last_pa_unsafe_;
244         wsrep_seqno_t last_preordered_seqno_;
245         wsrep_trx_id_t last_preordered_id_;
246         gu::Mutex     stats_mutex_;
247         size_t        n_certified_;
248         wsrep_seqno_t deps_dist_;
249         wsrep_seqno_t cert_interval_;
250         size_t        index_size_;
251 
252         size_t        key_count_;
253         size_t        byte_count_;
254         size_t        trx_count_;
255 
256         /* The only reason those are not static constants is because
257          * there might be a need to thange them without recompilation.
258          * see #454 */
259         int          const max_length_; /* Purge trx_map_ when it exceeds this
260                                           * NOTE: this effectively sets a limit
261                                           * on trx certification interval */
262 
263         unsigned int const max_length_check_; /* Mask how often to check */
264 
265         bool               inconsistent_;
266         bool               log_conflicts_;
267         bool               optimistic_pa_;
268     };
269 }
270 
271 #endif // GALERA_CERTIFICATION_HPP
272