1 // 2 // Copyright (C) 2010-2018 Codership Oy <info@codership.com> 3 // 4 5 #ifndef GALERA_CERTIFICATION_HPP 6 #define GALERA_CERTIFICATION_HPP 7 8 9 #include "nbo.hpp" 10 #include "trx_handle.hpp" 11 #include "key_entry_ng.hpp" 12 #include "galera_service_thd.hpp" 13 #include "galera_view.hpp" 14 15 #include <gu_shared_ptr.hpp> 16 #include <gu_unordered.hpp> 17 #include <gu_lock.hpp> 18 #include <gu_config.hpp> 19 #include <gu_gtid.hpp> 20 21 #include <map> 22 #include <list> 23 24 namespace galera 25 { 26 class Certification 27 { 28 public: 29 30 static std::string const PARAM_LOG_CONFLICTS; 31 static std::string const PARAM_OPTIMISTIC_PA; 32 33 static void register_params(gu::Config&); 34 35 typedef gu::UnorderedSet<KeyEntryOS*, 36 KeyEntryPtrHash, KeyEntryPtrEqual> CertIndex; 37 38 typedef gu::UnorderedSet<KeyEntryNG*, 39 KeyEntryPtrHashNG, KeyEntryPtrEqualNG> 40 CertIndexNG; 41 42 typedef gu::UnorderedMultiset<KeyEntryNG*, 43 KeyEntryPtrHashNG, KeyEntryPtrEqualNG> 44 CertIndexNBO; 45 46 private: 47 48 typedef std::multiset<wsrep_seqno_t> DepsSet; 49 50 typedef std::map<wsrep_seqno_t, TrxHandleSlavePtr> TrxMap; 51 52 public: 53 54 typedef enum 55 { 56 TEST_OK, 57 TEST_FAILED 58 } TestResult; 59 60 Certification(gu::Config& conf, ServiceThd* thd); 61 ~Certification(); 62 63 void assign_initial_position(const gu::GTID& gtid, int version); 64 TestResult append_trx(const TrxHandleSlavePtr&); 65 TestResult test(const TrxHandleSlavePtr&, bool store_keys); position() const66 wsrep_seqno_t position() const { return position_; } 67 wsrep_seqno_t increment_position(); /* for dummy IST events */ 68 69 /* this is for configuration change use */ 70 void adjust_position(const View&, const gu::GTID& gtid, int version); 71 72 wsrep_seqno_t get_safe_to_discard_seqno() const73 get_safe_to_discard_seqno() const 74 { 75 gu::Lock lock(mutex_); 76 return get_safe_to_discard_seqno_(); 77 } 78 79 wsrep_seqno_t purge_trxs_upto(wsrep_seqno_t const seqno,bool const handle_gcache)80 purge_trxs_upto(wsrep_seqno_t const seqno, bool const handle_gcache) 81 { 82 gu::Lock lock(mutex_); 83 const wsrep_seqno_t stds(get_safe_to_discard_seqno_()); 84 // assert(seqno <= get_safe_to_discard_seqno()); 85 // Note: setting trx committed is not done in total order so 86 // safe to discard seqno may decrease. Enable assertion above when 87 // this issue is fixed. 88 return purge_trxs_upto_(std::min(seqno, stds), handle_gcache); 89 } 90 91 // Set trx corresponding to handle committed. Return purge seqno if 92 // index purge is required, -1 otherwise. 93 wsrep_seqno_t set_trx_committed(TrxHandleSlave&); 94 95 // statistics section stats_get(double & avg_cert_interval,double & avg_deps_dist,size_t & index_size) const96 void stats_get(double& avg_cert_interval, 97 double& avg_deps_dist, 98 size_t& index_size) const 99 { 100 gu::Lock lock(stats_mutex_); 101 avg_cert_interval = 0; 102 avg_deps_dist = 0; 103 if (n_certified_) 104 { 105 avg_cert_interval = double(cert_interval_) / n_certified_; 106 avg_deps_dist = double(deps_dist_) / n_certified_; 107 } 108 index_size = index_size_; 109 } 110 stats_reset()111 void stats_reset() 112 { 113 gu::Lock lock(stats_mutex_); 114 cert_interval_ = 0; 115 deps_dist_ = 0; 116 n_certified_ = 0; 117 index_size_ = 0; 118 } 119 120 void param_set(const std::string& key, const std::string& value); 121 lowest_trx_seqno() const122 wsrep_seqno_t lowest_trx_seqno() const 123 { 124 return (trx_map_.empty() ? position_ : trx_map_.begin()->first); 125 } 126 127 // 128 // NBO context lifecycle: 129 // * Context is created when NBO-start event is received 130 // * Context stays in nbo_ctx_map_ until client calls erase_nbo_ctx() 131 // 132 133 // Get NBO context matching to global seqno 134 gu::shared_ptr<NBOCtx>::type nbo_ctx(wsrep_seqno_t); 135 // Erase NBO context entry 136 void erase_nbo_ctx(wsrep_seqno_t); nbo_size() const137 size_t nbo_size() const { return nbo_map_.size(); } 138 139 void mark_inconsistent(); is_inconsistent() const140 bool is_inconsistent() const { return inconsistent_; } 141 142 private: 143 144 // Non-copyable 145 Certification(const Certification&); 146 Certification& operator=(const Certification&); 147 148 TestResult do_test(const TrxHandleSlavePtr&, bool store_keys); 149 TestResult do_test_v3to5(TrxHandleSlave*, bool); 150 TestResult do_test_preordered(TrxHandleSlave*); 151 TestResult do_test_nbo(const TrxHandleSlavePtr&); 152 void purge_for_trx(TrxHandleSlave*); 153 154 // unprotected variants for internal use 155 wsrep_seqno_t get_safe_to_discard_seqno_() const; 156 wsrep_seqno_t purge_trxs_upto_(wsrep_seqno_t, bool sync); 157 158 gu::shared_ptr<NBOCtx>::type nbo_ctx_unlocked(wsrep_seqno_t); 159 index_purge_required()160 bool index_purge_required() 161 { 162 static unsigned int const KEYS_THRESHOLD (1 << 10); // 1K 163 static unsigned int const BYTES_THRESHOLD(128 << 20); // 128M 164 static unsigned int const TRXS_THRESHOLD (127); 165 166 /* if either key count, byte count or trx count exceed their 167 * threshold, zero up counts and return true. */ 168 return ((key_count_ > KEYS_THRESHOLD || 169 byte_count_ > BYTES_THRESHOLD || 170 trx_count_ > TRXS_THRESHOLD) 171 && 172 (key_count_ = 0, byte_count_ = 0, trx_count_ = 0, true)); 173 } 174 175 class PurgeAndDiscard 176 { 177 public: 178 PurgeAndDiscard(Certification & cert)179 PurgeAndDiscard(Certification& cert) : cert_(cert) { } 180 operator ()(TrxMap::value_type & vt) const181 void operator()(TrxMap::value_type& vt) const 182 { 183 { 184 TrxHandleSlave* trx(vt.second.get()); 185 // Trying to lock trx mutex here may cause deadlock 186 // with streaming replication. Locking can be skipped 187 // because trx is only read here and refcount uses atomics. 188 // Memory barrier is provided by certification mutex. 189 // 190 // TrxHandleLock lock(*trx); 191 192 if (!cert_.is_inconsistent()) 193 { 194 assert(trx->is_committed() == true); 195 if (trx->is_committed() == false) 196 { 197 log_warn <<"trx not committed in purge and discard: " 198 << *trx; 199 } 200 } 201 202 // If depends seqno is not WSREP_SEQNO_UNDEFINED 203 // write set certification has passed and keys have been 204 // inserted into index and purge is needed. 205 // TOI write sets will always pass regular certification 206 // and keys will be inserted, however if they fail 207 // NBO certification depends seqno is set to 208 // WSREP_SEQNO_UNDEFINED. Therefore purge should always 209 // be done for TOI write sets. 210 if (trx->is_dummy() == false || trx->is_toi() == true) 211 { 212 cert_.purge_for_trx(trx); 213 } 214 } 215 } 216 PurgeAndDiscard(const PurgeAndDiscard & other)217 PurgeAndDiscard(const PurgeAndDiscard& other) : cert_(other.cert_) 218 { } 219 220 private: 221 222 void operator=(const PurgeAndDiscard&); 223 Certification& cert_; 224 }; 225 226 int version_; 227 gu::Config& conf_; 228 TrxMap trx_map_; 229 CertIndexNG cert_index_ng_; 230 NBOMap nbo_map_; 231 NBOCtxMap nbo_ctx_map_; 232 CertIndexNBO nbo_index_; 233 TrxHandleSlave::Pool nbo_pool_; 234 DepsSet deps_set_; 235 View current_view_; 236 ServiceThd* service_thd_; 237 gu::Mutex mutex_; 238 size_t trx_size_warn_count_; 239 wsrep_seqno_t initial_position_; 240 wsrep_seqno_t position_; 241 wsrep_seqno_t nbo_position_; 242 wsrep_seqno_t safe_to_discard_seqno_; 243 wsrep_seqno_t last_pa_unsafe_; 244 wsrep_seqno_t last_preordered_seqno_; 245 wsrep_trx_id_t last_preordered_id_; 246 gu::Mutex stats_mutex_; 247 size_t n_certified_; 248 wsrep_seqno_t deps_dist_; 249 wsrep_seqno_t cert_interval_; 250 size_t index_size_; 251 252 size_t key_count_; 253 size_t byte_count_; 254 size_t trx_count_; 255 256 /* The only reason those are not static constants is because 257 * there might be a need to thange them without recompilation. 258 * see #454 */ 259 int const max_length_; /* Purge trx_map_ when it exceeds this 260 * NOTE: this effectively sets a limit 261 * on trx certification interval */ 262 263 unsigned int const max_length_check_; /* Mask how often to check */ 264 265 bool inconsistent_; 266 bool log_conflicts_; 267 bool optimistic_pa_; 268 }; 269 } 270 271 #endif // GALERA_CERTIFICATION_HPP 272