1 //
2 // Copyright (C) 2010-2018 Codership Oy <info@codership.com>
3 //
4 
5 #ifndef GALERA_CERTIFICATION_HPP
6 #define GALERA_CERTIFICATION_HPP
7 
8 #include "trx_handle.hpp"
9 #include "key_entry_ng.hpp"
10 #include "galera_service_thd.hpp"
11 
12 #include "gu_unordered.hpp"
13 #include "gu_lock.hpp"
14 #include "gu_config.hpp"
15 
16 #include <map>
17 #include <set>
18 #include <list>
19 
20 namespace galera
21 {
22     class Certification
23     {
24     public:
25 
26         static std::string const PARAM_LOG_CONFLICTS;
27         static std::string const PARAM_OPTIMISTIC_PA;
28 
29         static void register_params(gu::Config&);
30 
31         typedef gu::UnorderedSet<KeyEntryOS*,
32                                  KeyEntryPtrHash, KeyEntryPtrEqual> CertIndex;
33 
34         typedef gu::UnorderedSet<KeyEntryNG*,
35                                  KeyEntryPtrHashNG, KeyEntryPtrEqualNG>
36         CertIndexNG;
37 
38     private:
39 
40         typedef std::multiset<wsrep_seqno_t>        DepsSet;
41 
42         typedef std::map<wsrep_seqno_t, TrxHandle*> TrxMap;
43 
44     public:
45 
46         typedef enum
47         {
48             TEST_OK,
49             TEST_FAILED
50         } TestResult;
51 
52         Certification(gu::Config& conf, ServiceThd& thd);
53         ~Certification();
54 
55         void assign_initial_position(wsrep_seqno_t seqno, int versiono);
56         TestResult append_trx(TrxHandle*);
57         TestResult test(TrxHandle*, bool = true);
position() const58         wsrep_seqno_t position() const { return position_; }
59 
60         wsrep_seqno_t
get_safe_to_discard_seqno() const61         get_safe_to_discard_seqno() const
62         {
63             gu::Lock lock(mutex_);
64             return get_safe_to_discard_seqno_();
65         }
66 
67         wsrep_seqno_t
purge_trxs_upto(wsrep_seqno_t const seqno,bool const handle_gcache)68         purge_trxs_upto(wsrep_seqno_t const seqno, bool const handle_gcache)
69         {
70             gu::Lock lock(mutex_);
71             const wsrep_seqno_t stds(get_safe_to_discard_seqno_());
72             // assert(seqno <= get_safe_to_discard_seqno());
73             // Note: setting trx committed is not done in total order so
74             // safe to discard seqno may decrease. Enable assertion above when
75             // this issue is fixed.
76             return purge_trxs_upto_(std::min(seqno, stds), handle_gcache);
77         }
78 
79         // Set trx corresponding to handle committed. Return purge seqno if
80         // index purge is required, -1 otherwise.
81         wsrep_seqno_t set_trx_committed(TrxHandle*);
82         TrxHandle* get_trx(wsrep_seqno_t);
83 
84         // statistics section
stats_get(double & avg_cert_interval,double & avg_deps_dist,size_t & index_size) const85         void stats_get(double& avg_cert_interval,
86                        double& avg_deps_dist,
87                        size_t& index_size) const
88         {
89             gu::Lock lock(stats_mutex_);
90             avg_cert_interval = 0;
91             avg_deps_dist = 0;
92             if (n_certified_)
93             {
94                 avg_cert_interval = double(cert_interval_) / n_certified_;
95                 avg_deps_dist = double(deps_dist_) / n_certified_;
96             }
97             index_size = index_size_;
98         }
99 
stats_reset()100         void stats_reset()
101         {
102             gu::Lock lock(stats_mutex_);
103             cert_interval_ = 0;
104             deps_dist_ = 0;
105             n_certified_ = 0;
106             index_size_ = 0;
107         }
108 
109         void param_set(const std::string& key, const std::string& value);
110 
111     private:
112 
113         TestResult do_test(TrxHandle*, bool);
114         TestResult do_test_v1to2(TrxHandle*, bool);
115         TestResult do_test_v3to4(TrxHandle*, bool);
116         TestResult do_test_preordered(TrxHandle*);
117         void purge_for_trx(TrxHandle*);
118         void purge_for_trx_v1to2(TrxHandle*);
119         void purge_for_trx_v3(TrxHandle*);
120 
121         // unprotected variants for internal use
122         wsrep_seqno_t get_safe_to_discard_seqno_() const;
123         wsrep_seqno_t purge_trxs_upto_(wsrep_seqno_t, bool sync);
124 
index_purge_required()125         bool index_purge_required()
126         {
127             static unsigned int const KEYS_THRESHOLD (1   << 10); // 1K
128             static unsigned int const BYTES_THRESHOLD(128 << 20); // 128M
129             static unsigned int const TRXS_THRESHOLD (127);
130 
131             /* if either key count, byte count or trx count exceed their
132              * threshold, zero up counts and return true. */
133             return ((key_count_  > KEYS_THRESHOLD  ||
134                      byte_count_ > BYTES_THRESHOLD ||
135                      trx_count_  > TRXS_THRESHOLD)
136                      &&
137                      (key_count_ = 0, byte_count_ = 0, trx_count_ = 0, true));
138         }
139 
140         class PurgeAndDiscard
141         {
142         public:
143 
PurgeAndDiscard(Certification & cert)144             PurgeAndDiscard(Certification& cert) : cert_(cert) { }
145 
operator ()(TrxMap::value_type & vt) const146             void operator()(TrxMap::value_type& vt) const
147             {
148                 {
149                     TrxHandle* trx(vt.second);
150                     TrxHandleLock lock(*trx);
151 
152                     if (trx->is_committed() == false)
153                     {
154                         log_warn << "trx not committed in purge and discard: "
155                                  << *trx;
156                     }
157 
158                     if (trx->depends_seqno() > -1)
159                     {
160                         cert_.purge_for_trx(trx);
161                     }
162 
163                     if (trx->refcnt() > 1)
164                     {
165                         log_debug << "trx "     << trx->trx_id()
166                                   << " refcnt " << trx->refcnt();
167                     }
168                 }
169                 vt.second->unref();
170             }
171 
PurgeAndDiscard(const PurgeAndDiscard & other)172             PurgeAndDiscard(const PurgeAndDiscard& other) : cert_(other.cert_)
173             { }
174 
175         private:
176 
177             void operator=(const PurgeAndDiscard&);
178             Certification& cert_;
179         };
180 
181         int           version_;
182         gu::Config&   conf_;
183         TrxMap        trx_map_;
184         CertIndex     cert_index_;
185         CertIndexNG   cert_index_ng_;
186         DepsSet       deps_set_;
187         ServiceThd&   service_thd_;
188         gu::Mutex     mutex_;
189         size_t        trx_size_warn_count_;
190         wsrep_seqno_t initial_position_;
191         wsrep_seqno_t position_;
192         wsrep_seqno_t safe_to_discard_seqno_;
193         wsrep_seqno_t last_pa_unsafe_;
194         wsrep_seqno_t last_preordered_seqno_;
195         wsrep_trx_id_t last_preordered_id_;
196         gu::Mutex     stats_mutex_;
197         size_t        n_certified_;
198         wsrep_seqno_t deps_dist_;
199         wsrep_seqno_t cert_interval_;
200         size_t        index_size_;
201 
202         size_t        key_count_;
203         size_t        byte_count_;
204         size_t        trx_count_;
205 
206         /* The only reason those are not static constants is because
207          * there might be a need to thange them without recompilation.
208          * see #454 */
209         int          const max_length_; /* Purge trx_map_ when it exceeds this
210                                           * NOTE: this effectively sets a limit
211                                           * on trx certification interval */
212 
213         unsigned int const max_length_check_; /* Mask how often to check */
214 
215         bool               log_conflicts_;
216         bool               optimistic_pa_;
217     };
218 }
219 
220 #endif // GALERA_CERTIFICATION_HPP
221