1 // 2 // Copyright (C) 2010-2018 Codership Oy <info@codership.com> 3 // 4 5 #ifndef GALERA_CERTIFICATION_HPP 6 #define GALERA_CERTIFICATION_HPP 7 8 #include "trx_handle.hpp" 9 #include "key_entry_ng.hpp" 10 #include "galera_service_thd.hpp" 11 12 #include "gu_unordered.hpp" 13 #include "gu_lock.hpp" 14 #include "gu_config.hpp" 15 16 #include <map> 17 #include <set> 18 #include <list> 19 20 namespace galera 21 { 22 class Certification 23 { 24 public: 25 26 static std::string const PARAM_LOG_CONFLICTS; 27 static std::string const PARAM_OPTIMISTIC_PA; 28 29 static void register_params(gu::Config&); 30 31 typedef gu::UnorderedSet<KeyEntryOS*, 32 KeyEntryPtrHash, KeyEntryPtrEqual> CertIndex; 33 34 typedef gu::UnorderedSet<KeyEntryNG*, 35 KeyEntryPtrHashNG, KeyEntryPtrEqualNG> 36 CertIndexNG; 37 38 private: 39 40 typedef std::multiset<wsrep_seqno_t> DepsSet; 41 42 typedef std::map<wsrep_seqno_t, TrxHandle*> TrxMap; 43 44 public: 45 46 typedef enum 47 { 48 TEST_OK, 49 TEST_FAILED 50 } TestResult; 51 52 Certification(gu::Config& conf, ServiceThd& thd); 53 ~Certification(); 54 55 void assign_initial_position(wsrep_seqno_t seqno, int versiono); 56 TestResult append_trx(TrxHandle*); 57 TestResult test(TrxHandle*, bool = true); position() const58 wsrep_seqno_t position() const { return position_; } 59 60 wsrep_seqno_t get_safe_to_discard_seqno() const61 get_safe_to_discard_seqno() const 62 { 63 gu::Lock lock(mutex_); 64 return get_safe_to_discard_seqno_(); 65 } 66 67 wsrep_seqno_t purge_trxs_upto(wsrep_seqno_t const seqno,bool const handle_gcache)68 purge_trxs_upto(wsrep_seqno_t const seqno, bool const handle_gcache) 69 { 70 gu::Lock lock(mutex_); 71 const wsrep_seqno_t stds(get_safe_to_discard_seqno_()); 72 // assert(seqno <= get_safe_to_discard_seqno()); 73 // Note: setting trx committed is not done in total order so 74 // safe to discard seqno may decrease. Enable assertion above when 75 // this issue is fixed. 76 return purge_trxs_upto_(std::min(seqno, stds), handle_gcache); 77 } 78 79 // Set trx corresponding to handle committed. Return purge seqno if 80 // index purge is required, -1 otherwise. 81 wsrep_seqno_t set_trx_committed(TrxHandle*); 82 TrxHandle* get_trx(wsrep_seqno_t); 83 84 // statistics section stats_get(double & avg_cert_interval,double & avg_deps_dist,size_t & index_size) const85 void stats_get(double& avg_cert_interval, 86 double& avg_deps_dist, 87 size_t& index_size) const 88 { 89 gu::Lock lock(stats_mutex_); 90 avg_cert_interval = 0; 91 avg_deps_dist = 0; 92 if (n_certified_) 93 { 94 avg_cert_interval = double(cert_interval_) / n_certified_; 95 avg_deps_dist = double(deps_dist_) / n_certified_; 96 } 97 index_size = index_size_; 98 } 99 stats_reset()100 void stats_reset() 101 { 102 gu::Lock lock(stats_mutex_); 103 cert_interval_ = 0; 104 deps_dist_ = 0; 105 n_certified_ = 0; 106 index_size_ = 0; 107 } 108 109 void param_set(const std::string& key, const std::string& value); 110 111 private: 112 113 TestResult do_test(TrxHandle*, bool); 114 TestResult do_test_v1to2(TrxHandle*, bool); 115 TestResult do_test_v3to4(TrxHandle*, bool); 116 TestResult do_test_preordered(TrxHandle*); 117 void purge_for_trx(TrxHandle*); 118 void purge_for_trx_v1to2(TrxHandle*); 119 void purge_for_trx_v3(TrxHandle*); 120 121 // unprotected variants for internal use 122 wsrep_seqno_t get_safe_to_discard_seqno_() const; 123 wsrep_seqno_t purge_trxs_upto_(wsrep_seqno_t, bool sync); 124 index_purge_required()125 bool index_purge_required() 126 { 127 static unsigned int const KEYS_THRESHOLD (1 << 10); // 1K 128 static unsigned int const BYTES_THRESHOLD(128 << 20); // 128M 129 static unsigned int const TRXS_THRESHOLD (127); 130 131 /* if either key count, byte count or trx count exceed their 132 * threshold, zero up counts and return true. */ 133 return ((key_count_ > KEYS_THRESHOLD || 134 byte_count_ > BYTES_THRESHOLD || 135 trx_count_ > TRXS_THRESHOLD) 136 && 137 (key_count_ = 0, byte_count_ = 0, trx_count_ = 0, true)); 138 } 139 140 class PurgeAndDiscard 141 { 142 public: 143 PurgeAndDiscard(Certification & cert)144 PurgeAndDiscard(Certification& cert) : cert_(cert) { } 145 operator ()(TrxMap::value_type & vt) const146 void operator()(TrxMap::value_type& vt) const 147 { 148 { 149 TrxHandle* trx(vt.second); 150 TrxHandleLock lock(*trx); 151 152 if (trx->is_committed() == false) 153 { 154 log_warn << "trx not committed in purge and discard: " 155 << *trx; 156 } 157 158 if (trx->depends_seqno() > -1) 159 { 160 cert_.purge_for_trx(trx); 161 } 162 163 if (trx->refcnt() > 1) 164 { 165 log_debug << "trx " << trx->trx_id() 166 << " refcnt " << trx->refcnt(); 167 } 168 } 169 vt.second->unref(); 170 } 171 PurgeAndDiscard(const PurgeAndDiscard & other)172 PurgeAndDiscard(const PurgeAndDiscard& other) : cert_(other.cert_) 173 { } 174 175 private: 176 177 void operator=(const PurgeAndDiscard&); 178 Certification& cert_; 179 }; 180 181 int version_; 182 gu::Config& conf_; 183 TrxMap trx_map_; 184 CertIndex cert_index_; 185 CertIndexNG cert_index_ng_; 186 DepsSet deps_set_; 187 ServiceThd& service_thd_; 188 gu::Mutex mutex_; 189 size_t trx_size_warn_count_; 190 wsrep_seqno_t initial_position_; 191 wsrep_seqno_t position_; 192 wsrep_seqno_t safe_to_discard_seqno_; 193 wsrep_seqno_t last_pa_unsafe_; 194 wsrep_seqno_t last_preordered_seqno_; 195 wsrep_trx_id_t last_preordered_id_; 196 gu::Mutex stats_mutex_; 197 size_t n_certified_; 198 wsrep_seqno_t deps_dist_; 199 wsrep_seqno_t cert_interval_; 200 size_t index_size_; 201 202 size_t key_count_; 203 size_t byte_count_; 204 size_t trx_count_; 205 206 /* The only reason those are not static constants is because 207 * there might be a need to thange them without recompilation. 208 * see #454 */ 209 int const max_length_; /* Purge trx_map_ when it exceeds this 210 * NOTE: this effectively sets a limit 211 * on trx certification interval */ 212 213 unsigned int const max_length_check_; /* Mask how often to check */ 214 215 bool log_conflicts_; 216 bool optimistic_pa_; 217 }; 218 } 219 220 #endif // GALERA_CERTIFICATION_HPP 221