1 //
2 // Copyright (C) 2010-2018 Codership Oy <info@codership.com>
3 //
4 
5 #include "certification.hpp"
6 
7 #include "gu_lock.hpp"
8 #include "gu_throw.hpp"
9 
10 #include <map>
11 #include <algorithm> // std::for_each
12 
13 
14 using namespace galera;
15 
16 static const bool cert_debug_on(false);
17 #define cert_debug                              \
18     if (cert_debug_on == false) { }             \
19     else log_info << "cert debug: "
20 
21 #define CERT_PARAM_LOG_CONFLICTS galera::Certification::PARAM_LOG_CONFLICTS
22 #define CERT_PARAM_OPTIMISTIC_PA galera::Certification::PARAM_OPTIMISTIC_PA
23 
24 static std::string const CERT_PARAM_PREFIX("cert.");
25 
26 std::string const CERT_PARAM_LOG_CONFLICTS(CERT_PARAM_PREFIX + "log_conflicts");
27 std::string const CERT_PARAM_OPTIMISTIC_PA(CERT_PARAM_PREFIX + "optimistic_pa");
28 
29 static std::string const CERT_PARAM_MAX_LENGTH   (CERT_PARAM_PREFIX +
30                                                   "max_length");
31 static std::string const CERT_PARAM_LENGTH_CHECK (CERT_PARAM_PREFIX +
32                                                   "length_check");
33 
34 static std::string const CERT_PARAM_LOG_CONFLICTS_DEFAULT("no");
35 static std::string const CERT_PARAM_OPTIMISTIC_PA_DEFAULT("yes");
36 
37 /*** It is EXTREMELY important that these constants are the same on all nodes.
38  *** Don't change them ever!!! ***/
39 static std::string const CERT_PARAM_MAX_LENGTH_DEFAULT("16384");
40 static std::string const CERT_PARAM_LENGTH_CHECK_DEFAULT("127");
41 
42 void
register_params(gu::Config & cnf)43 galera::Certification::register_params(gu::Config& cnf)
44 {
45     cnf.add(CERT_PARAM_LOG_CONFLICTS, CERT_PARAM_LOG_CONFLICTS_DEFAULT);
46     cnf.add(CERT_PARAM_OPTIMISTIC_PA, CERT_PARAM_OPTIMISTIC_PA_DEFAULT);
47     /* The defaults below are deliberately not reflected in conf: people
48      * should not know about these dangerous setting unless they read RTFM. */
49     cnf.add(CERT_PARAM_MAX_LENGTH);
50     cnf.add(CERT_PARAM_LENGTH_CHECK);
51 }
52 
53 /* a function to get around unset defaults in ctor initialization list */
54 static int
max_length(const gu::Config & conf)55 max_length(const gu::Config& conf)
56 {
57     if (conf.is_set(CERT_PARAM_MAX_LENGTH))
58         return conf.get<int>(CERT_PARAM_MAX_LENGTH);
59     else
60         return gu::Config::from_config<int>(CERT_PARAM_MAX_LENGTH_DEFAULT);
61 }
62 
63 /* a function to get around unset defaults in ctor initialization list */
64 static int
length_check(const gu::Config & conf)65 length_check(const gu::Config& conf)
66 {
67     if (conf.is_set(CERT_PARAM_LENGTH_CHECK))
68         return conf.get<int>(CERT_PARAM_LENGTH_CHECK);
69     else
70         return gu::Config::from_config<int>(CERT_PARAM_LENGTH_CHECK_DEFAULT);
71 }
72 
73 // Purge key set from given index
purge_key_set(galera::Certification::CertIndexNG & cert_index,galera::TrxHandleSlave * ts,const galera::KeySetIn & key_set,const long count)74 static void purge_key_set(galera::Certification::CertIndexNG& cert_index,
75                           galera::TrxHandleSlave*             ts,
76                           const galera::KeySetIn&             key_set,
77                           const long                          count)
78 {
79     for (long i(0); i < count; ++i)
80     {
81         const galera::KeySet::KeyPart& kp(key_set.next());
82         galera::KeyEntryNG ke(kp);
83         galera::Certification::CertIndexNG::iterator ci(cert_index.find(&ke));
84         assert(ci != cert_index.end());
85         if (ci == cert_index.end())
86         {
87             log_warn << "Could not find key from index";
88             continue;
89         }
90         galera::KeyEntryNG* const kep(*ci);
91         assert(kep->referenced() == true);
92 
93         wsrep_key_type_t const p(kp.wsrep_type(ts->version()));
94 
95         if (kep->ref_trx(p) == ts)
96         {
97             kep->unref(p, ts);
98             if (kep->referenced() == false)
99             {
100                 cert_index.erase(ci);
101                 delete kep;
102             }
103         }
104     }
105 }
106 
107 void
purge_for_trx(TrxHandleSlave * trx)108 galera::Certification::purge_for_trx(TrxHandleSlave* trx)
109 {
110     assert(mutex_.owned());
111     assert(trx->version() >= 3 || trx->version() <= 5);
112     const KeySetIn& keys(trx->write_set().keyset());
113     keys.rewind();
114     purge_key_set(cert_index_ng_, trx, keys, keys.count());
115 }
116 
117 /* Specifically for chain use in certify_and_depend_v3to5() */
118 template <wsrep_key_type_t REF_KEY_TYPE>
119 bool
check_against(const galera::KeyEntryNG * const found,const galera::KeySet::KeyPart & key,wsrep_key_type_t const key_type,galera::TrxHandleSlave * const trx,bool const log_conflict,wsrep_seqno_t & depends_seqno)120 check_against(const galera::KeyEntryNG*   const found,
121               const galera::KeySet::KeyPart&    key,
122               wsrep_key_type_t            const key_type,
123               galera::TrxHandleSlave*     const trx,
124               bool                        const log_conflict,
125               wsrep_seqno_t&                    depends_seqno)
126 {
127     enum CheckType
128     {
129         CONFLICT,
130         DEPENDENCY,
131         NOTHING
132     };
133 
134     static CheckType const check_table
135         [WSREP_KEY_EXCLUSIVE+1][WSREP_KEY_EXCLUSIVE+1] =
136         {
137             //  SH          RE            UP         EX   second / first
138             { NOTHING,    NOTHING,    DEPENDENCY, DEPENDENCY },  // SH
139             { NOTHING,    NOTHING,    DEPENDENCY, CONFLICT   },  // RE
140             { DEPENDENCY, DEPENDENCY, CONFLICT,   CONFLICT   },  // UP
141             { CONFLICT,   CONFLICT,   CONFLICT,   CONFLICT   }   // EX
142         };
143 
144     const galera::TrxHandleSlave* const ref_trx(found->ref_trx(REF_KEY_TYPE));
145 
146     // trx should not have any references in index at this point
147     assert(ref_trx != trx);
148 
149     bool conflict(false);
150 
151     if (gu_likely(0 != ref_trx))
152     {
153         if ((REF_KEY_TYPE == WSREP_KEY_EXCLUSIVE ||
154              REF_KEY_TYPE == WSREP_KEY_UPDATE) && ref_trx)
155         {
156             cert_debug << KeySet::type(REF_KEY_TYPE) << " match: "
157                        << *trx << " <---> " << *ref_trx;
158         }
159 
160         if (REF_KEY_TYPE == WSREP_KEY_SHARED    ||
161             REF_KEY_TYPE == WSREP_KEY_REFERENCE ||
162             REF_KEY_TYPE == WSREP_KEY_UPDATE) assert(!ref_trx->is_toi());
163 
164         CheckType const check_type(check_table[REF_KEY_TYPE][key_type]);
165 
166         switch (check_type)
167         {
168         case CONFLICT:
169             // cert conflict takes place if
170             // 1) write sets originated from different nodes, are within cert
171             //    range
172             // 2) ref_trx is in isolation mode, write sets are within cert range
173             // 3) Trx has not been certified yet. Already certified trxs show up
174             //    here during index rebuild.
175             conflict = (ref_trx->global_seqno() > trx->last_seen_seqno() &&
176                         (ref_trx->is_toi() ||
177                          trx->source_id() != ref_trx->source_id()) &&
178                         trx->certified() == false);
179 
180             if (gu_unlikely(cert_debug_on || (conflict && log_conflict == true)))
181             {
182                 log_info << KeySet::type(key_type) << '-'
183                          << KeySet::type(REF_KEY_TYPE)
184                          << " trx " << (conflict ? "conflict" : "match")
185                          << " for key " << key << ": "
186                          << *trx << " <---> " << *ref_trx;
187             }
188             /* fall through */
189         case DEPENDENCY:
190             depends_seqno = std::max(ref_trx->global_seqno(), depends_seqno);
191             /* fall through */
192         case NOTHING:;
193         }
194     }
195 
196     return conflict;
197 }
198 
199 /*! for convenience returns true if conflict and false if not */
200 static inline bool
certify_and_depend_v3to5(const galera::KeyEntryNG * const found,const galera::KeySet::KeyPart & key,galera::TrxHandleSlave * const trx,bool const log_conflict)201 certify_and_depend_v3to5(const galera::KeyEntryNG*   const found,
202                          const galera::KeySet::KeyPart&    key,
203                          galera::TrxHandleSlave*     const trx,
204                          bool                        const log_conflict)
205 {
206     bool ret(false);
207     wsrep_seqno_t depends_seqno(trx->depends_seqno());
208     wsrep_key_type_t const key_type(key.wsrep_type(trx->version()));
209 
210     /*
211      * The following cascade implements these rules:
212      *
213      *      | ex | up | re | sh |  <- horizontal axis: trx   key type
214      *   ------------------------     vertical   axis: found key type
215      *   ex | C  | C  | C  | C  |
216      *   ------------------------
217      *   up | C  | C  | D  | D  |
218      *   ------------------------     C - conflict
219      *   re | C  | D  | N  | N  |     D - dependency
220      *   ------------------------     N - nothing
221      *   sh | D  | D  | N  | N  |
222      *   ------------------------
223      *
224      * Note that depends_seqno is an in/out parameter and is updated on every
225      * step.
226      */
227     if (check_against<WSREP_KEY_EXCLUSIVE>
228         (found, key, key_type, trx, log_conflict, depends_seqno) ||
229         check_against<WSREP_KEY_UPDATE>
230         (found, key, key_type, trx, log_conflict, depends_seqno) ||
231         (key_type >= WSREP_KEY_UPDATE &&
232          /* exclusive keys must be checked against shared */
233          (check_against<WSREP_KEY_REFERENCE>
234           (found, key, key_type, trx, log_conflict, depends_seqno) ||
235           check_against<WSREP_KEY_SHARED>
236           (found, key, key_type, trx, log_conflict, depends_seqno))))
237     {
238         ret = true;
239     }
240 
241     if (depends_seqno > trx->depends_seqno())
242         trx->set_depends_seqno(depends_seqno);
243 
244     return ret;
245 }
246 
247 /* returns true on collision, false otherwise */
248 static bool
certify_v3to5(galera::Certification::CertIndexNG & cert_index_ng,const galera::KeySet::KeyPart & key,galera::TrxHandleSlave * const trx,bool const store_keys,bool const log_conflicts)249 certify_v3to5(galera::Certification::CertIndexNG& cert_index_ng,
250               const galera::KeySet::KeyPart&      key,
251               galera::TrxHandleSlave*     const   trx,
252               bool                        const   store_keys,
253               bool                        const   log_conflicts)
254 {
255     galera::KeyEntryNG ke(key);
256     galera::Certification::CertIndexNG::iterator ci(cert_index_ng.find(&ke));
257 
258     if (cert_index_ng.end() == ci)
259     {
260         if (store_keys)
261         {
262             galera::KeyEntryNG* const kep(new galera::KeyEntryNG(ke));
263             ci = cert_index_ng.insert(kep).first;
264 
265             cert_debug << "created new entry";
266         }
267         return false;
268     }
269     else
270     {
271         cert_debug << "found existing entry";
272 
273         galera::KeyEntryNG* const kep(*ci);
274         // Note: For we skip certification for isolated trxs, only
275         // cert index and key_list is populated.
276         return (!trx->is_toi() &&
277                 certify_and_depend_v3to5(kep, key, trx, log_conflicts));
278     }
279 }
280 
281 // Add key to trx references for trx that passed certification.
282 //
283 // @param cert_index certification index in use
284 // @param trx        certified transaction
285 // @param key_set    key_set used in certification
286 // @param key_count  number of keys in key set
do_ref_keys(galera::Certification::CertIndexNG & cert_index,galera::TrxHandleSlave * const trx,const galera::KeySetIn & key_set,const long key_count)287 static void do_ref_keys(galera::Certification::CertIndexNG& cert_index,
288                         galera::TrxHandleSlave*       const trx,
289                         const galera::KeySetIn&             key_set,
290                         const long                          key_count)
291 {
292     for (long i(0); i < key_count; ++i)
293     {
294         const galera::KeySet::KeyPart& k(key_set.next());
295         galera::KeyEntryNG ke(k);
296         galera::Certification::CertIndexNG::const_iterator
297             ci(cert_index.find(&ke));
298 
299         if (ci == cert_index.end())
300         {
301             gu_throw_fatal << "could not find key '" << k
302                            << "' from cert index";
303         }
304         (*ci)->ref(k.wsrep_type(trx->version()), k, trx);
305     }
306 }
307 
308 // Clean up keys from index that were added by trx that failed
309 // certification.
310 //
311 // @param cert_index certification inde
312 // @param key_set    key_set used in certification
313 // @param processed  number of keys that were processed in certification
do_clean_keys(galera::Certification::CertIndexNG & cert_index,const galera::TrxHandleSlave * const trx,const galera::KeySetIn & key_set,const long processed)314 static void do_clean_keys(galera::Certification::CertIndexNG& cert_index,
315                           const galera::TrxHandleSlave* const trx,
316                           const galera::KeySetIn&             key_set,
317                           const long                          processed)
318 {
319     /* 'strictly less' comparison is essential in the following loop:
320      * processed key failed cert and was not added to index */
321     for (long i(0); i < processed; ++i)
322     {
323         KeyEntryNG ke(key_set.next());
324 
325         // Clean up cert index from entries which were added by this trx
326         galera::Certification::CertIndexNG::iterator ci(cert_index.find(&ke));
327 
328         if (gu_likely(ci != cert_index.end()))
329         {
330             galera::KeyEntryNG* kep(*ci);
331 
332             if (kep->referenced() == false)
333             {
334                 // kel was added to cert_index_ by this trx -
335                 // remove from cert_index_ and fall through to delete
336                 cert_index.erase(ci);
337             }
338             else continue;
339 
340             assert(kep->referenced() == false);
341 
342             delete kep;
343         }
344         else if(ke.key().wsrep_type(trx->version()) == WSREP_KEY_SHARED)
345         {
346             assert(0); // we actually should never be here, the key should
347                        // be either added to cert_index_ or be there already
348             log_warn  << "could not find shared key '"
349                       << ke.key() << "' from cert index";
350         }
351         else { /* non-shared keys can duplicate shared in the key set */ }
352     }
353 }
354 
355 galera::Certification::TestResult
do_test_v3to5(TrxHandleSlave * trx,bool store_keys)356 galera::Certification::do_test_v3to5(TrxHandleSlave* trx, bool store_keys)
357 {
358     cert_debug << "BEGIN CERTIFICATION v" << trx->version() << ": " << *trx;
359 
360 #ifndef NDEBUG
361     // to check that cleanup after cert failure returns cert_index
362     // to original size
363     size_t prev_cert_index_size(cert_index_ng_.size());
364 #endif // NDEBUG
365 
366     const KeySetIn& key_set(trx->write_set().keyset());
367     long const      key_count(key_set.count());
368     long            processed(0);
369 
370     key_set.rewind();
371 
372     for (; processed < key_count; ++processed)
373     {
374         const KeySet::KeyPart& key(key_set.next());
375 
376         if (certify_v3to5(cert_index_ng_, key, trx, store_keys, log_conflicts_))
377         {
378             trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_));
379             goto cert_fail;
380         }
381     }
382 
383     trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_));
384 
385     if (store_keys == true)
386     {
387         assert (key_count == processed);
388         key_set.rewind();
389         do_ref_keys(cert_index_ng_, trx, key_set, key_count);
390 
391         if (trx->pa_unsafe()) last_pa_unsafe_ = trx->global_seqno();
392 
393         key_count_ += key_count;
394     }
395     cert_debug << "END CERTIFICATION (success): " << *trx;
396     return TEST_OK;
397 
398 cert_fail:
399 
400     cert_debug << "END CERTIFICATION (failed): " << *trx;
401 
402     assert (processed < key_count);
403 
404     if (store_keys == true)
405     {
406         /* Clean up key entries allocated for this trx */
407         key_set.rewind();
408         do_clean_keys(cert_index_ng_, trx, key_set, processed);
409         assert(cert_index_ng_.size() == prev_cert_index_size);
410     }
411 
412     return TEST_FAILED;
413 }
414 
415 /* Determine whether a given trx can be correctly certified under the
416  * certification protocol currently established in the group (cert_version)
417  * Certification protocols from 1 to 3 could only handle writesets of the same
418  * version. Certification protocol 4 can handle writesets of both version 3
419  * and 4 */
420 static inline bool
trx_cert_version_match(int const trx_version,int const cert_version)421 trx_cert_version_match(int const trx_version, int const cert_version)
422 {
423     if (cert_version <= 3)
424     {
425         return (trx_version == cert_version);
426     }
427     else
428     {
429         return (trx_version >= 3 && trx_version <= cert_version);
430     }
431 }
432 
433 galera::Certification::TestResult
do_test(const TrxHandleSlavePtr & trx,bool store_keys)434 galera::Certification::do_test(const TrxHandleSlavePtr& trx, bool store_keys)
435 {
436     assert(trx->source_id() != WSREP_UUID_UNDEFINED);
437 
438     if (!trx_cert_version_match(trx->version(), version_))
439     {
440         log_warn << "trx protocol version: "
441                  << trx->version()
442                  << " does not match certification protocol version: "
443                  << version_;
444         return TEST_FAILED;
445     }
446 
447     // trx->is_certified() == true during index rebuild from IST, do_test()
448     // must not fail, just populate index
449     if (gu_unlikely(trx->certified() == false &&
450                     (trx->last_seen_seqno() < initial_position_ ||
451                      trx->global_seqno()-trx->last_seen_seqno() > max_length_)))
452     {
453         if (trx->global_seqno() - trx->last_seen_seqno() > max_length_)
454         {
455             log_warn << "certification interval for trx " << *trx
456                      << " exceeds the limit of " << max_length_;
457         }
458 
459         return TEST_FAILED;
460     }
461 
462     TestResult res(TEST_FAILED);
463 
464     gu::Lock lock(mutex_); // why do we need that? - e.g. set_trx_committed()
465 
466     /* initialize parent seqno */
467     if (gu_unlikely(trx_map_.empty()))
468     {
469         trx->set_depends_seqno(trx->global_seqno() - 1);
470     }
471     else
472     {
473         if (optimistic_pa_ == false &&
474             trx->last_seen_seqno() > trx->depends_seqno())
475             trx->set_depends_seqno(trx->last_seen_seqno());
476 
477         wsrep_seqno_t const ds(trx_map_.begin()->first - 1);
478         if (ds > trx->depends_seqno()) trx->set_depends_seqno(ds);
479     }
480 
481     switch (version_)
482     {
483     case 1:
484     case 2:
485         break;
486     case 3:
487     case 4:
488     case 5:
489         res = do_test_v3to5(trx.get(), store_keys);
490         break;
491     default:
492         gu_throw_fatal << "certification test for version "
493                        << version_ << " not implemented";
494     }
495 
496     if (store_keys == true && res == TEST_OK)
497     {
498         ++trx_count_;
499         gu::Lock lock(stats_mutex_);
500         ++n_certified_;
501         deps_dist_ += (trx->global_seqno() - trx->depends_seqno());
502         cert_interval_ += (trx->global_seqno() - trx->last_seen_seqno() - 1);
503         index_size_ = cert_index_ng_.size();
504     }
505 
506     // Additional NBO certification.
507     if (trx->flags() & TrxHandle::F_ISOLATION)
508     {
509         res = do_test_nbo(trx);
510         assert(TEST_FAILED == res || trx->depends_seqno() >= 0);
511     }
512 
513     byte_count_ += trx->size();
514 
515     return res;
516 }
517 
518 
519 galera::Certification::TestResult
do_test_preordered(TrxHandleSlave * trx)520 galera::Certification::do_test_preordered(TrxHandleSlave* trx)
521 {
522     /* Source ID is not always available for preordered events (e.g. event
523      * producer didn't provide any) so for now we must accept undefined IDs. */
524     //assert(trx->source_id() != WSREP_UUID_UNDEFINED);
525 
526     assert(trx->version() >= 3);
527     assert(trx->preordered());
528 
529     /* we don't want to go any further unless the writeset checksum is ok */
530     trx->verify_checksum(); // throws
531     /* if checksum failed we need to throw ASAP, let the caller catch it,
532      * flush monitors, save state and abort. */
533 
534     /* This is a primitive certification test for preordered actions:
535      * it does not handle gaps and relies on general apply monitor for
536      * parallel applying. Ideally there should be a certification object
537      * per source. */
538 
539     if (gu_unlikely(last_preordered_id_ &&
540                     (last_preordered_id_ + 1 != trx->trx_id())))
541     {
542         log_warn << "Gap in preordered stream: source_id '" << trx->source_id()
543                  << "', trx_id " << trx->trx_id() << ", previous id "
544                  << last_preordered_id_;
545         assert(0);
546     }
547 
548     trx->set_depends_seqno(last_preordered_seqno_ + 1 -
549                            trx->write_set().pa_range());
550     // +1 compensates for subtracting from a previous seqno, rather than own.
551     trx->mark_certified();
552 
553     last_preordered_seqno_ = trx->global_seqno();
554     last_preordered_id_    = trx->trx_id();
555 
556     return TEST_OK;
557 }
558 
559 
560 //
561 // non-blocking operations
562 //
563 
564 // Prepare a copy of TrxHandleSlave with private storage
copy_ts(galera::TrxHandleSlave * ts,galera::TrxHandleSlave::Pool & pool,gu::shared_ptr<NBOCtx>::type nbo_ctx)565 galera::NBOEntry copy_ts(
566     galera::TrxHandleSlave* ts,
567     galera::TrxHandleSlave::Pool& pool,
568     gu::shared_ptr<NBOCtx>::type nbo_ctx)
569 {
570     // FIXME: Pass proper working directory from config to MappedBuffer ctor
571     gu::shared_ptr<galera::MappedBuffer>::type buf(
572         new galera::MappedBuffer("/tmp"));
573     assert(ts->action().first && ts->action().second);
574     if (ts->action().first == 0)
575     {
576         gu_throw_fatal
577             << "Unassigned action pointer for transaction, cannot make a copy of: "
578             << *ts;
579     }
580 
581     buf->resize(ts->action().second);
582     std::copy(static_cast<const gu::byte_t*>(ts->action().first),
583               static_cast<const gu::byte_t*>(ts->action().first)
584               + ts->action().second,
585               buf->begin());
586 
587     galera::TrxHandleSlaveDeleter d;
588     gu::shared_ptr<galera::TrxHandleSlave>::type new_ts(
589         galera::TrxHandleSlave::New(ts->local(), pool), d);
590     if (buf->size() > size_t(std::numeric_limits<int32_t>::max()))
591         gu_throw_error(ERANGE) << "Buffer size " << buf->size()
592                                << " out of range";
593     gcs_action act = {ts->global_seqno(), ts->local_seqno(),
594                       &(*buf)[0], static_cast<int32_t>(buf->size()),
595                       GCS_ACT_WRITESET};
596     if (ts->certified() == false)
597     {
598         // TrxHandleSlave is from group
599         gu_trace(new_ts->unserialize<true>(act));
600     }
601     else
602     {
603         // TrxHandleSlave is from IST
604         gu_trace(new_ts->unserialize<false>(act));
605     }
606     new_ts->set_local(ts->local());
607     return galera::NBOEntry(new_ts, buf, nbo_ctx);
608 }
609 
610 
purge_key_set_nbo(galera::Certification::CertIndexNBO & cert_index,bool is_nbo_index,galera::TrxHandleSlave * ts,const galera::KeySetIn & key_set,const long count)611 static void purge_key_set_nbo(galera::Certification::CertIndexNBO& cert_index,
612                               bool                                 is_nbo_index,
613                               galera::TrxHandleSlave*              ts,
614                               const galera::KeySetIn&              key_set,
615                               const long                           count)
616 {
617     using galera::Certification;
618     using galera::KeyEntryNG;
619     using galera::KeySet;
620 
621     key_set.rewind();
622 
623     for (long i(0); i < count; ++i)
624     {
625         KeyEntryNG ke(key_set.next());
626         std::pair<Certification::CertIndexNBO::iterator,
627                   Certification::CertIndexNBO::iterator>
628             ci_range(cert_index.equal_range(&ke));
629 
630         assert(std::distance(ci_range.first, ci_range.second) >= 1);
631 
632         wsrep_key_type_t const p(ke.key().wsrep_type(ts->version()));
633         Certification::CertIndexNBO::iterator ci;
634         for (ci = ci_range.first; ci != ci_range.second; ++ci)
635         {
636             if ((*ci)->ref_trx(p) == ts) break;
637         }
638         assert(ci != ci_range.second);
639         if (ci == ci_range.second)
640         {
641             log_warn << "purge_key_set_nbo(): Could not find key "
642                      << ke.key() << " from NBO index, skipping";
643             continue;
644         }
645 
646         KeyEntryNG* const kep(*ci);
647         assert(kep->referenced() == true);
648 
649         kep->unref(p, ts);
650         assert(kep->referenced() == false);
651         cert_index.erase(ci);
652         delete kep;
653     }
654 }
655 
656 
end_nbo(galera::NBOMap::iterator i,galera::TrxHandleSlavePtr ts,galera::Certification::CertIndexNBO & nbo_index,galera::NBOMap & nbo_map)657 static void end_nbo(galera::NBOMap::iterator             i,
658                     galera::TrxHandleSlavePtr            ts,
659                     galera::Certification::CertIndexNBO& nbo_index,
660                     galera::NBOMap&                      nbo_map)
661 {
662     NBOEntry& e(i->second);
663 
664     log_debug << "Ending NBO started by " << *e.ts_ptr();
665 
666     // Erase entry from index
667     const KeySetIn& key_set(e.ts_ptr()->write_set().keyset());
668     purge_key_set_nbo(nbo_index, true, e.ts_ptr(), key_set, key_set.count());
669 
670     ts->set_ends_nbo(e.ts_ptr()->global_seqno());
671 
672     nbo_map.erase(i);
673 }
674 
675 
nbo_ctx_unlocked(wsrep_seqno_t const seqno)676 gu::shared_ptr<NBOCtx>::type galera::Certification::nbo_ctx_unlocked(
677     wsrep_seqno_t const seqno)
678 {
679     // This will either
680     // * Insert a new NBOCtx shared_ptr into ctx map if one didn't exist
681     //   before, or
682     // * Return existing entry, while newly created shared ptr gets freed
683     //   automatically when it goes out of scope
684     return nbo_ctx_map_.insert(
685         std::make_pair(seqno,
686                        gu::make_shared<NBOCtx>())).first->second;
687 }
688 
nbo_ctx(wsrep_seqno_t const seqno)689 gu::shared_ptr<NBOCtx>::type galera::Certification::nbo_ctx(
690     wsrep_seqno_t const seqno)
691 {
692     assert(seqno > 0);
693     gu::Lock lock(mutex_);
694     return nbo_ctx_unlocked(seqno);
695 }
696 
erase_nbo_ctx(wsrep_seqno_t const seqno)697 void galera::Certification::erase_nbo_ctx(wsrep_seqno_t const seqno)
698 {
699     assert(seqno > 0);
700     gu::Lock lock(mutex_);
701 
702     size_t n_erased(nbo_ctx_map_.erase(seqno));
703     assert(n_erased == 1); (void)n_erased;
704 }
705 
706 
is_exclusive(const galera::KeyEntryNG * ke)707 static bool is_exclusive(const galera::KeyEntryNG* ke)
708 {
709     assert(ke != 0);
710     assert((ke->ref_trx(WSREP_KEY_SHARED) ||
711             ke->ref_trx(WSREP_KEY_REFERENCE) ||
712             ke->ref_trx(WSREP_KEY_UPDATE) ||
713             ke->ref_trx(WSREP_KEY_EXCLUSIVE)) &&
714            !(ke->ref_trx(WSREP_KEY_SHARED) &&
715              ke->ref_trx(WSREP_KEY_REFERENCE) &&
716              ke->ref_trx(WSREP_KEY_UPDATE) &&
717              ke->ref_trx(WSREP_KEY_EXCLUSIVE)));
718     return (ke->ref_trx(WSREP_KEY_EXCLUSIVE) != 0 ||
719             ke->ref_trx(WSREP_KEY_UPDATE) != 0);
720 }
721 
722 static bool
certify_nbo(galera::Certification::CertIndexNBO & cert_index,const galera::KeySet::KeyPart & key,galera::TrxHandleSlave * const trx,bool const log_conflicts)723 certify_nbo(galera::Certification::CertIndexNBO& cert_index,
724             const galera::KeySet::KeyPart&       key,
725             galera::TrxHandleSlave* const        trx,
726             bool                    const        log_conflicts)
727 {
728     using galera::KeyEntryNG;
729     using galera::Certification;
730     using galera::TrxHandleSlave;
731 
732     KeyEntryNG ke(key);
733     std::pair<Certification::CertIndexNBO::iterator,
734               Certification::CertIndexNBO::iterator>
735         it(cert_index.equal_range(&ke));
736 
737     // Certification is done over whole index as opposed to regular
738     // write set certification where only given range is used
739 
740     // If found range is non-empty, it must be either single exclusive
741     // key or all shared.
742     assert(std::count_if(it.first, it.second, is_exclusive) == 0 ||
743            std::distance(it.first, it.second) == 1);
744 
745     Certification::CertIndexNBO::iterator i;
746     if ((i = std::find_if(it.first, it.second, is_exclusive)) != cert_index.end())
747     {
748         if (gu_unlikely(log_conflicts == true))
749         {
750             const TrxHandleSlave* ref_trx((*i)->ref_trx(WSREP_KEY_EXCLUSIVE));
751             assert(ref_trx != 0);
752             log_info << "NBO conflict for key " << key << ": "
753                      << *trx << " <--X--> " << *ref_trx;
754         }
755         return true;
756     }
757     return false;
758 }
759 
760 static void
do_ref_keys_nbo(galera::Certification::CertIndexNBO & index,TrxHandleSlave * const trx,const galera::KeySetIn & key_set,const long key_count)761 do_ref_keys_nbo(galera::Certification::CertIndexNBO& index,
762                 TrxHandleSlave*                const trx,
763                 const galera::KeySetIn&              key_set,
764                 const long                           key_count)
765 {
766     using galera::KeySet;
767     using galera::KeyEntryNG;
768     using galera::Certification;
769 
770     key_set.rewind();
771 
772     for (long i(0); i < key_count; ++i)
773     {
774         const KeySet::KeyPart& key(key_set.next());
775         wsrep_key_type_t const type(key.wsrep_type(trx->version()));
776         KeyEntryNG* kep (new KeyEntryNG(key));
777         Certification::CertIndexNBO::iterator it;
778         assert((it = index.find(kep)) == index.end() ||
779                (*it)->ref_trx(type) != trx);
780         it = index.insert(kep);
781         (*it)->ref(type, key, trx);
782     }
783 }
784 
785 
do_test_nbo(const TrxHandleSlavePtr & ts)786 galera::Certification::TestResult galera::Certification::do_test_nbo(
787     const TrxHandleSlavePtr& ts)
788 {
789     assert(!ts->is_dummy());
790     assert(ts->flags() & TrxHandle::F_ISOLATION);
791     assert(ts->flags() & (TrxHandle::F_BEGIN | TrxHandle::F_COMMIT));
792 
793     if (nbo_position_ >= ts->global_seqno())
794     {
795         // This is part of cert index preload, needs to be dropped since
796         // it is already processed by this node before partitioning.
797         assert(ts->certified() == true);
798         // Return TEST_OK. If the trx is in index preload, it has
799         // passed certification on donor.
800         log_debug << "Dropping NBO " << *ts;
801         return TEST_OK;
802     }
803     nbo_position_ = ts->global_seqno();
804 
805 #ifndef NDEBUG
806     size_t prev_nbo_index_size(nbo_index_.size());
807 #endif // NDEBUG
808 
809     bool ineffective(false);
810 
811     galera::Certification::TestResult ret(TEST_OK);
812     if ((ts->flags() & TrxHandle::F_BEGIN) &&
813         (ts->flags() & TrxHandle::F_COMMIT))
814     {
815         // Old school atomic TOI
816         log_debug << "TOI: " << *ts;
817         const KeySetIn& key_set(ts->write_set().keyset());
818         long const      key_count(key_set.count());
819         long            processed(0);
820 
821         key_set.rewind();
822         for (; processed < key_count; ++processed)
823         {
824             const KeySet::KeyPart& key(key_set.next());
825             if (certify_nbo(nbo_index_, key, ts.get(), log_conflicts_))
826             {
827                 ret = TEST_FAILED;
828                 break;
829             }
830         }
831         log_debug << "NBO test result " << ret << " for TOI " << *ts;
832         // Atomic TOI should not cause change in NBO index
833         assert(prev_nbo_index_size == nbo_index_.size());
834     }
835     else if (ts->flags() & TrxHandle::F_BEGIN)
836     {
837         // Beginning of non-blocking operation
838         log_debug << "NBO start: " << *ts;
839         // We need a copy of ts since the lifetime of NBO may exceed
840         // the lifetime of the buffer in GCache
841         NBOEntry entry(copy_ts(ts.get(), nbo_pool_, nbo_ctx_unlocked(
842                                    ts->global_seqno())));
843 
844         TrxHandleSlave* new_ts(entry.ts_ptr());
845         const KeySetIn& key_set(new_ts->write_set().keyset());
846         long const      key_count(key_set.count());
847         long            processed(0);
848 
849         key_set.rewind();
850         for (; processed < key_count; ++processed)
851         {
852             const KeySet::KeyPart& key(key_set.next());
853             if (certify_nbo(nbo_index_, key, new_ts, log_conflicts_))
854             {
855                 ret = TEST_FAILED;
856                 break;
857             }
858         }
859 
860         switch (ret)
861         {
862         case TEST_OK:
863             do_ref_keys_nbo(nbo_index_, new_ts, key_set, key_count);
864             nbo_map_.insert(std::make_pair(new_ts->global_seqno(),
865                                            entry));
866             break;
867         case TEST_FAILED:
868             // Clean keys not needed here since certify_nbo()
869             // does not insert them into nbo_index_
870             break;
871         }
872     }
873     else
874     {
875         assert(ts->nbo_end());
876         // End of non-blocking operation
877         log_debug << "NBO end: " << *ts;
878         ineffective = true;
879 
880         NBOKey key;
881         const DataSetIn& ws(ts->write_set().dataset());
882         ws.rewind();
883         assert(ws.count() == 1);
884         if (ws.count() != 1) gu_throw_fatal << "Invalid dataset count in "
885                                             << *ts;
886         gu::Buf buf(ws.next());
887         key.unserialize(static_cast<const gu::byte_t*>(buf.ptr), buf.size, 0);
888 
889         NBOMap::iterator i(nbo_map_.find(key));
890         if (i != nbo_map_.end())
891         {
892             NBOEntry& e(i->second);
893             e.add_ended(ts->source_id());
894             if (ts->local() == true)
895             {
896                 // Clear NBO context aborted flag if it is set by
897                 // intermediate view change.
898                 e.nbo_ctx()->set_aborted(false);
899             }
900 
901             if (current_view_.subset_of(e.ended_set()))
902             {
903                 // All nodes of the current primary view have
904                 // ended the operation.
905                 end_nbo(i, ts, nbo_index_, nbo_map_);
906                 ineffective = false;
907             }
908         }
909         else
910         {
911             log_warn << "no corresponding NBO begin found for NBO end " << *ts;
912         }
913     }
914 
915     if (gu_likely(TEST_OK == ret))
916     {
917         ts->set_depends_seqno(ts->global_seqno() - 1);
918         if (gu_unlikely(ineffective))
919         {
920             assert(ts->nbo_end());
921             assert(ts->ends_nbo() == WSREP_SEQNO_UNDEFINED);
922             ret = TEST_FAILED;
923         }
924     }
925 
926     assert(TEST_FAILED == ret || ts->depends_seqno() >= 0);
927 
928     return ret;
929 }
930 
Certification(gu::Config & conf,ServiceThd * thd)931 galera::Certification::Certification(gu::Config& conf, ServiceThd* thd)
932     :
933     version_               (-1),
934     conf_                  (conf),
935     trx_map_               (),
936     cert_index_ng_         (),
937     nbo_map_               (),
938     nbo_ctx_map_           (),
939     nbo_index_             (),
940     nbo_pool_              (sizeof(TrxHandleSlave)),
941     deps_set_              (),
942     current_view_          (),
943     service_thd_           (thd),
944     mutex_                 (),
945     trx_size_warn_count_   (0),
946     initial_position_      (-1),
947     position_              (-1),
948     nbo_position_          (-1),
949     safe_to_discard_seqno_ (-1),
950     last_pa_unsafe_        (-1),
951     last_preordered_seqno_ (position_),
952     last_preordered_id_    (0),
953     stats_mutex_           (),
954     n_certified_           (0),
955     deps_dist_             (0),
956     cert_interval_         (0),
957     index_size_            (0),
958     key_count_             (0),
959     byte_count_            (0),
960     trx_count_             (0),
961 
962     max_length_            (max_length(conf)),
963     max_length_check_      (length_check(conf)),
964     inconsistent_          (false),
965     log_conflicts_         (conf.get<bool>(CERT_PARAM_LOG_CONFLICTS)),
966     optimistic_pa_         (conf.get<bool>(CERT_PARAM_OPTIMISTIC_PA))
967 {}
968 
969 
~Certification()970 galera::Certification::~Certification()
971 {
972     log_info << "cert index usage at exit "   << cert_index_ng_.size();
973     log_info << "cert trx map usage at exit " << trx_map_.size();
974     log_info << "deps set usage at exit "     << deps_set_.size();
975 
976     double avg_cert_interval(0);
977     double avg_deps_dist(0);
978     size_t index_size(0);
979     stats_get(avg_cert_interval, avg_deps_dist, index_size);
980     log_info << "avg deps dist "              << avg_deps_dist;
981     log_info << "avg cert interval "          << avg_cert_interval;
982     log_info << "cert index size "            << index_size;
983 
984     gu::Lock lock(mutex_);
985 
986     for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
987     trx_map_.clear();
988     nbo_map_.clear();
989     std::for_each(nbo_index_.begin(), nbo_index_.end(),
990                   [](CertIndexNBO::value_type key_entry)
991                   {
992                       for (int i(0); i <= KeySet::Key::TYPE_MAX; ++i)
993                       {
994                           wsrep_key_type_t key_type(static_cast<wsrep_key_type_t>(i));
995                           const TrxHandleSlave* ts(key_entry->ref_trx(key_type));
996                           if (ts)
997                           {
998                               key_entry->unref(key_type, ts);
999                           }
1000                       }
1001                       delete key_entry;
1002                   });
1003     if (service_thd_)
1004     {
1005         service_thd_->release_seqno(position_);
1006         service_thd_->flush(gu::UUID());
1007     }
1008 }
1009 
1010 
assign_initial_position(const gu::GTID & gtid,int const version)1011 void galera::Certification::assign_initial_position(const gu::GTID& gtid,
1012                                                     int const       version)
1013 {
1014     assert(gtid.seqno() >= 0 || gtid == gu::GTID());
1015     switch (version)
1016     {
1017         // value -1 used in initialization when trx protocol version is not
1018         // available
1019     case -1:
1020     case 1:
1021     case 2:
1022     case 3:
1023     case 4:
1024     case 5:
1025         break;
1026     default:
1027         gu_throw_fatal << "certification/trx version "
1028                        << version << " not supported";
1029     }
1030 
1031     wsrep_seqno_t const seqno(gtid.seqno());
1032     gu::Lock lock(mutex_);
1033 
1034     std::for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
1035 
1036     if (seqno >= position_)
1037     {
1038         assert(cert_index_ng_.size() == 0);
1039     }
1040     else
1041     {
1042         if (seqno > 0) // don't warn on index reset.
1043         {
1044             log_warn << "moving position backwards: " << position_ << " -> "
1045                      << seqno;
1046         }
1047 
1048         std::for_each(cert_index_ng_.begin(), cert_index_ng_.end(),
1049                       gu::DeleteObject());
1050         cert_index_ng_.clear();
1051     }
1052 
1053     trx_map_.clear();
1054     assert(cert_index_ng_.empty());
1055 
1056     if (service_thd_)
1057     {
1058         service_thd_->release_seqno(position_);
1059         service_thd_->flush(gtid.uuid());
1060     }
1061 
1062     log_info << "####### Assign initial position for certification: " << gtid
1063              << ", protocol version: " << version;
1064 
1065     initial_position_      = seqno;
1066     position_              = seqno;
1067     safe_to_discard_seqno_ = seqno;
1068     last_pa_unsafe_        = seqno;
1069     last_preordered_seqno_ = position_;
1070     last_preordered_id_    = 0;
1071     version_               = version;
1072 }
1073 
1074 
1075 void
adjust_position(const View & view,const gu::GTID & gtid,int const version)1076 galera::Certification::adjust_position(const View&         view,
1077                                        const gu::GTID&     gtid,
1078                                        int           const version)
1079 {
1080     assert(gtid.uuid()  != GU_UUID_NIL);
1081     assert(gtid.seqno() >= 0);
1082 
1083     gu::Lock lock(mutex_);
1084 
1085 // this assert is too strong: local ordered transactions may get canceled without
1086 // entering certification    assert(position_ + 1 == seqno || 0 == position_);
1087 
1088     log_info << "####### Adjusting cert position: "
1089              << position_ << " -> " << gtid.seqno();
1090 
1091     if (version != version_)
1092     {
1093         std::for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
1094         assert(trx_map_.empty() || trx_map_.rbegin()->first + 1 == position_);
1095         trx_map_.clear();
1096         assert(cert_index_ng_.empty());
1097         if (service_thd_)
1098         {
1099             service_thd_->release_seqno(position_);
1100         }
1101     }
1102 
1103     if (service_thd_)
1104     {
1105         service_thd_->flush(gtid.uuid());
1106     }
1107 
1108     position_       = gtid.seqno();
1109     last_pa_unsafe_ = position_;
1110     version_        = version;
1111     current_view_   = view;
1112 
1113     // Loop over NBO entries, clear state and abort waiters. NBO end waiters
1114     // must resend end messages.
1115     for (NBOMap::iterator i(nbo_map_.begin()); i != nbo_map_.end(); ++i)
1116     {
1117         NBOEntry& e(i->second);
1118         e.clear_ended();
1119         e.nbo_ctx()->set_aborted(true);
1120     }
1121 }
1122 
1123 wsrep_seqno_t
increment_position()1124 galera::Certification::increment_position()
1125 {
1126     gu::Lock lock(mutex_);
1127     position_++;
1128     return position_;
1129 }
1130 
1131 galera::Certification::TestResult
test(const TrxHandleSlavePtr & trx,bool store_keys)1132 galera::Certification::test(const TrxHandleSlavePtr& trx, bool store_keys)
1133 {
1134     assert(trx->global_seqno() >= 0 /* && trx->local_seqno() >= 0 */);
1135 
1136     const TestResult ret
1137         (trx->preordered() ?
1138          do_test_preordered(trx.get()) : do_test(trx, store_keys));
1139 
1140     if (gu_unlikely(ret != TEST_OK)) { trx->mark_dummy(); }
1141 
1142     return ret;
1143 }
1144 
1145 
get_safe_to_discard_seqno_() const1146 wsrep_seqno_t galera::Certification::get_safe_to_discard_seqno_() const
1147 {
1148     wsrep_seqno_t retval;
1149     if (deps_set_.empty() == true)
1150     {
1151         retval = safe_to_discard_seqno_;
1152     }
1153     else
1154     {
1155         retval = (*deps_set_.begin()) - 1;
1156     }
1157     return retval;
1158 }
1159 
1160 
1161 wsrep_seqno_t
purge_trxs_upto_(wsrep_seqno_t const seqno,bool const handle_gcache)1162 galera::Certification::purge_trxs_upto_(wsrep_seqno_t const seqno,
1163                                         bool const          handle_gcache)
1164 {
1165     assert (seqno > 0);
1166 
1167     TrxMap::iterator purge_bound(trx_map_.upper_bound(seqno));
1168 
1169     cert_debug << "purging index up to " << seqno;
1170 
1171     for_each(trx_map_.begin(), purge_bound, PurgeAndDiscard(*this));
1172     trx_map_.erase(trx_map_.begin(), purge_bound);
1173 
1174     if (handle_gcache && service_thd_) service_thd_->release_seqno(seqno);
1175 
1176     if (0 == ((trx_map_.size() + 1) % 10000))
1177     {
1178         log_debug << "trx map after purge: length: " << trx_map_.size()
1179                   << ", requested purge seqno: " << seqno
1180                   << ", real purge seqno: " << trx_map_.begin()->first - 1;
1181     }
1182 
1183     return seqno;
1184 }
1185 
1186 
1187 galera::Certification::TestResult
append_trx(const TrxHandleSlavePtr & trx)1188 galera::Certification::append_trx(const TrxHandleSlavePtr& trx)
1189 {
1190 // explicit ROLLBACK is dummy()    assert(!trx->is_dummy());
1191     assert(trx->global_seqno() >= 0 /* && trx->local_seqno() >= 0 */);
1192     assert(trx->global_seqno() > position_);
1193 
1194 #ifndef NDEBUG
1195     bool const explicit_rollback(trx->explicit_rollback());
1196 #endif /* NDEBUG */
1197 
1198     {
1199         gu::Lock lock(mutex_);
1200 
1201         if (gu_unlikely(trx->global_seqno() != position_ + 1))
1202         {
1203             // this is perfectly normal if trx is rolled back just after
1204             // replication, keeping the log though
1205             log_debug << "seqno gap, position: " << position_
1206                       << " trx seqno " << trx->global_seqno();
1207         }
1208 
1209         if (gu_unlikely((trx->last_seen_seqno() + 1) < trx_map_.begin()->first))
1210         {
1211             /* See #733 - for now it is false positive */
1212             cert_debug
1213                 << "WARNING: last_seen_seqno is below certification index: "
1214                 << trx_map_.begin()->first << " > " << trx->last_seen_seqno();
1215         }
1216 
1217         position_ = trx->global_seqno();
1218 
1219         if (gu_unlikely(!(position_ & max_length_check_) &&
1220                         (trx_map_.size() > static_cast<size_t>(max_length_))))
1221         {
1222             log_debug << "trx map size: " << trx_map_.size()
1223                       << " - check if status.last_committed is incrementing";
1224 
1225             wsrep_seqno_t       trim_seqno(position_ - max_length_);
1226             wsrep_seqno_t const stds      (get_safe_to_discard_seqno_());
1227 
1228             if (trim_seqno > stds)
1229             {
1230                 log_warn << "Attempt to trim certification index at "
1231                          << trim_seqno << ", above safe-to-discard: " << stds;
1232                 trim_seqno = stds;
1233             }
1234             else
1235             {
1236                 cert_debug << "purging index up to " << trim_seqno;
1237             }
1238 
1239             purge_trxs_upto_(trim_seqno, true);
1240         }
1241     }
1242 
1243     const TestResult retval(test(trx, true));
1244 
1245     {
1246         assert(trx->global_seqno() > 0);
1247 
1248         gu::Lock lock(mutex_);
1249         if (trx_map_.insert(
1250                 std::make_pair(trx->global_seqno(), trx)).second == false)
1251             gu_throw_fatal << "duplicate trx entry " << *trx;
1252 
1253         // trx with local seqno WSREP_SEQNO_UNDEFINED originates from
1254         // IST so deps set tracking should not be done
1255         if (trx->local_seqno() != WSREP_SEQNO_UNDEFINED)
1256         {
1257             assert(trx->last_seen_seqno() != WSREP_SEQNO_UNDEFINED);
1258             deps_set_.insert(trx->last_seen_seqno());
1259             assert(deps_set_.size() <= trx_map_.size());
1260         }
1261     }
1262 
1263     if (!trx->certified()) trx->mark_certified();
1264 
1265 #ifndef NDEBUG
1266     if (explicit_rollback)
1267     {
1268         assert(trx->explicit_rollback());
1269         assert(retval == TEST_OK);
1270         assert(trx->state() == TrxHandle::S_CERTIFYING);
1271     }
1272 #endif /* NDEBUG */
1273 
1274     return retval;
1275 }
1276 
1277 
set_trx_committed(TrxHandleSlave & trx)1278 wsrep_seqno_t galera::Certification::set_trx_committed(TrxHandleSlave& trx)
1279 {
1280     assert(trx.global_seqno() >= 0);
1281     assert(trx.is_committed() == false);
1282 
1283     wsrep_seqno_t ret(WSREP_SEQNO_UNDEFINED);
1284     {
1285         gu::Lock lock(mutex_);
1286 
1287         // certified trx with local seqno WSREP_SEQNO_UNDEFINED originates from
1288         // IST so deps set tracking should not be done
1289         // Local explicit rollback events bypassed certificaiton
1290         if (trx.certified()   == true &&
1291             trx.local_seqno() != WSREP_SEQNO_UNDEFINED &&
1292             !trx.cert_bypass())
1293         {
1294             assert(trx.last_seen_seqno() != WSREP_SEQNO_UNDEFINED);
1295             DepsSet::iterator i(deps_set_.find(trx.last_seen_seqno()));
1296             assert(i != deps_set_.end());
1297 
1298             if (deps_set_.size() == 1) safe_to_discard_seqno_ = *i;
1299 
1300             deps_set_.erase(i);
1301         }
1302 
1303         if (gu_unlikely(index_purge_required()))
1304         {
1305             ret = get_safe_to_discard_seqno_();
1306         }
1307     }
1308 
1309     trx.mark_committed();
1310 
1311     return ret;
1312 }
1313 
1314 void
set_boolean_parameter(bool & param,const std::string & value,const std::string & param_name,const std::string & change_msg)1315 set_boolean_parameter(bool& param,
1316                       const std::string& value,
1317                       const std::string& param_name,
1318                       const std::string& change_msg)
1319 {
1320     try
1321     {
1322         bool const old(param);
1323         param = gu::Config::from_config<bool>(value);
1324         if (old != param)
1325         {
1326             log_info << (param ? "Enabled " : "Disabled ") << change_msg;
1327         }
1328     }
1329     catch (gu::NotFound& e)
1330     {
1331         gu_throw_error(EINVAL) << "Bad value '" << value
1332                                << "' for boolean parameter '"
1333                                << param_name << '\'';
1334     }
1335 }
1336 
1337 void
param_set(const std::string & key,const std::string & value)1338 galera::Certification::param_set(const std::string& key,
1339                                  const std::string& value)
1340 {
1341     if (key == Certification::PARAM_LOG_CONFLICTS)
1342     {
1343         set_boolean_parameter(log_conflicts_, value, CERT_PARAM_LOG_CONFLICTS,
1344                               "logging of certification conflicts.");
1345     }
1346     else if (key == Certification::PARAM_OPTIMISTIC_PA)
1347     {
1348         set_boolean_parameter(optimistic_pa_, value, CERT_PARAM_OPTIMISTIC_PA,
1349                               "\"optimistic\" parallel applying.");
1350     }
1351     else
1352     {
1353         throw gu::NotFound();
1354     }
1355 
1356     conf_.set(key, value);
1357 }
1358 
1359 void
mark_inconsistent()1360 galera::Certification::mark_inconsistent()
1361 {
1362     gu::Lock lock(mutex_);
1363     assert(!inconsistent_);
1364     inconsistent_ = true;
1365 }
1366