1 //
2 // Copyright (C) 2010-2018 Codership Oy <info@codership.com>
3 //
4
5 #include "certification.hpp"
6
7 #include "gu_lock.hpp"
8 #include "gu_throw.hpp"
9
10 #include <map>
11 #include <algorithm> // std::for_each
12
13
14 using namespace galera;
15
16 static const bool cert_debug_on(false);
17 #define cert_debug \
18 if (cert_debug_on == false) { } \
19 else log_info << "cert debug: "
20
21 #define CERT_PARAM_LOG_CONFLICTS galera::Certification::PARAM_LOG_CONFLICTS
22 #define CERT_PARAM_OPTIMISTIC_PA galera::Certification::PARAM_OPTIMISTIC_PA
23
24 static std::string const CERT_PARAM_PREFIX("cert.");
25
26 std::string const CERT_PARAM_LOG_CONFLICTS(CERT_PARAM_PREFIX + "log_conflicts");
27 std::string const CERT_PARAM_OPTIMISTIC_PA(CERT_PARAM_PREFIX + "optimistic_pa");
28
29 static std::string const CERT_PARAM_MAX_LENGTH (CERT_PARAM_PREFIX +
30 "max_length");
31 static std::string const CERT_PARAM_LENGTH_CHECK (CERT_PARAM_PREFIX +
32 "length_check");
33
34 static std::string const CERT_PARAM_LOG_CONFLICTS_DEFAULT("no");
35 static std::string const CERT_PARAM_OPTIMISTIC_PA_DEFAULT("yes");
36
37 /*** It is EXTREMELY important that these constants are the same on all nodes.
38 *** Don't change them ever!!! ***/
39 static std::string const CERT_PARAM_MAX_LENGTH_DEFAULT("16384");
40 static std::string const CERT_PARAM_LENGTH_CHECK_DEFAULT("127");
41
42 void
register_params(gu::Config & cnf)43 galera::Certification::register_params(gu::Config& cnf)
44 {
45 cnf.add(CERT_PARAM_LOG_CONFLICTS, CERT_PARAM_LOG_CONFLICTS_DEFAULT);
46 cnf.add(CERT_PARAM_OPTIMISTIC_PA, CERT_PARAM_OPTIMISTIC_PA_DEFAULT);
47 /* The defaults below are deliberately not reflected in conf: people
48 * should not know about these dangerous setting unless they read RTFM. */
49 cnf.add(CERT_PARAM_MAX_LENGTH);
50 cnf.add(CERT_PARAM_LENGTH_CHECK);
51 }
52
53 /* a function to get around unset defaults in ctor initialization list */
54 static int
max_length(const gu::Config & conf)55 max_length(const gu::Config& conf)
56 {
57 if (conf.is_set(CERT_PARAM_MAX_LENGTH))
58 return conf.get<int>(CERT_PARAM_MAX_LENGTH);
59 else
60 return gu::Config::from_config<int>(CERT_PARAM_MAX_LENGTH_DEFAULT);
61 }
62
63 /* a function to get around unset defaults in ctor initialization list */
64 static int
length_check(const gu::Config & conf)65 length_check(const gu::Config& conf)
66 {
67 if (conf.is_set(CERT_PARAM_LENGTH_CHECK))
68 return conf.get<int>(CERT_PARAM_LENGTH_CHECK);
69 else
70 return gu::Config::from_config<int>(CERT_PARAM_LENGTH_CHECK_DEFAULT);
71 }
72
73 // Purge key set from given index
purge_key_set(galera::Certification::CertIndexNG & cert_index,galera::TrxHandleSlave * ts,const galera::KeySetIn & key_set,const long count)74 static void purge_key_set(galera::Certification::CertIndexNG& cert_index,
75 galera::TrxHandleSlave* ts,
76 const galera::KeySetIn& key_set,
77 const long count)
78 {
79 for (long i(0); i < count; ++i)
80 {
81 const galera::KeySet::KeyPart& kp(key_set.next());
82 galera::KeyEntryNG ke(kp);
83 galera::Certification::CertIndexNG::iterator ci(cert_index.find(&ke));
84 assert(ci != cert_index.end());
85 if (ci == cert_index.end())
86 {
87 log_warn << "Could not find key from index";
88 continue;
89 }
90 galera::KeyEntryNG* const kep(*ci);
91 assert(kep->referenced() == true);
92
93 wsrep_key_type_t const p(kp.wsrep_type(ts->version()));
94
95 if (kep->ref_trx(p) == ts)
96 {
97 kep->unref(p, ts);
98 if (kep->referenced() == false)
99 {
100 cert_index.erase(ci);
101 delete kep;
102 }
103 }
104 }
105 }
106
107 void
purge_for_trx(TrxHandleSlave * trx)108 galera::Certification::purge_for_trx(TrxHandleSlave* trx)
109 {
110 assert(mutex_.owned());
111 assert(trx->version() >= 3 || trx->version() <= 5);
112 const KeySetIn& keys(trx->write_set().keyset());
113 keys.rewind();
114 purge_key_set(cert_index_ng_, trx, keys, keys.count());
115 }
116
117 /* Specifically for chain use in certify_and_depend_v3to5() */
118 template <wsrep_key_type_t REF_KEY_TYPE>
119 bool
check_against(const galera::KeyEntryNG * const found,const galera::KeySet::KeyPart & key,wsrep_key_type_t const key_type,galera::TrxHandleSlave * const trx,bool const log_conflict,wsrep_seqno_t & depends_seqno)120 check_against(const galera::KeyEntryNG* const found,
121 const galera::KeySet::KeyPart& key,
122 wsrep_key_type_t const key_type,
123 galera::TrxHandleSlave* const trx,
124 bool const log_conflict,
125 wsrep_seqno_t& depends_seqno)
126 {
127 enum CheckType
128 {
129 CONFLICT,
130 DEPENDENCY,
131 NOTHING
132 };
133
134 static CheckType const check_table
135 [WSREP_KEY_EXCLUSIVE+1][WSREP_KEY_EXCLUSIVE+1] =
136 {
137 // SH RE UP EX second / first
138 { NOTHING, NOTHING, DEPENDENCY, DEPENDENCY }, // SH
139 { NOTHING, NOTHING, DEPENDENCY, CONFLICT }, // RE
140 { DEPENDENCY, DEPENDENCY, CONFLICT, CONFLICT }, // UP
141 { CONFLICT, CONFLICT, CONFLICT, CONFLICT } // EX
142 };
143
144 const galera::TrxHandleSlave* const ref_trx(found->ref_trx(REF_KEY_TYPE));
145
146 // trx should not have any references in index at this point
147 assert(ref_trx != trx);
148
149 bool conflict(false);
150
151 if (gu_likely(0 != ref_trx))
152 {
153 if ((REF_KEY_TYPE == WSREP_KEY_EXCLUSIVE ||
154 REF_KEY_TYPE == WSREP_KEY_UPDATE) && ref_trx)
155 {
156 cert_debug << KeySet::type(REF_KEY_TYPE) << " match: "
157 << *trx << " <---> " << *ref_trx;
158 }
159
160 if (REF_KEY_TYPE == WSREP_KEY_SHARED ||
161 REF_KEY_TYPE == WSREP_KEY_REFERENCE ||
162 REF_KEY_TYPE == WSREP_KEY_UPDATE) assert(!ref_trx->is_toi());
163
164 CheckType const check_type(check_table[REF_KEY_TYPE][key_type]);
165
166 switch (check_type)
167 {
168 case CONFLICT:
169 // cert conflict takes place if
170 // 1) write sets originated from different nodes, are within cert
171 // range
172 // 2) ref_trx is in isolation mode, write sets are within cert range
173 // 3) Trx has not been certified yet. Already certified trxs show up
174 // here during index rebuild.
175 conflict = (ref_trx->global_seqno() > trx->last_seen_seqno() &&
176 (ref_trx->is_toi() ||
177 trx->source_id() != ref_trx->source_id()) &&
178 trx->certified() == false);
179
180 if (gu_unlikely(cert_debug_on || (conflict && log_conflict == true)))
181 {
182 log_info << KeySet::type(key_type) << '-'
183 << KeySet::type(REF_KEY_TYPE)
184 << " trx " << (conflict ? "conflict" : "match")
185 << " for key " << key << ": "
186 << *trx << " <---> " << *ref_trx;
187 }
188 /* fall through */
189 case DEPENDENCY:
190 depends_seqno = std::max(ref_trx->global_seqno(), depends_seqno);
191 /* fall through */
192 case NOTHING:;
193 }
194 }
195
196 return conflict;
197 }
198
199 /*! for convenience returns true if conflict and false if not */
200 static inline bool
certify_and_depend_v3to5(const galera::KeyEntryNG * const found,const galera::KeySet::KeyPart & key,galera::TrxHandleSlave * const trx,bool const log_conflict)201 certify_and_depend_v3to5(const galera::KeyEntryNG* const found,
202 const galera::KeySet::KeyPart& key,
203 galera::TrxHandleSlave* const trx,
204 bool const log_conflict)
205 {
206 bool ret(false);
207 wsrep_seqno_t depends_seqno(trx->depends_seqno());
208 wsrep_key_type_t const key_type(key.wsrep_type(trx->version()));
209
210 /*
211 * The following cascade implements these rules:
212 *
213 * | ex | up | re | sh | <- horizontal axis: trx key type
214 * ------------------------ vertical axis: found key type
215 * ex | C | C | C | C |
216 * ------------------------
217 * up | C | C | D | D |
218 * ------------------------ C - conflict
219 * re | C | D | N | N | D - dependency
220 * ------------------------ N - nothing
221 * sh | D | D | N | N |
222 * ------------------------
223 *
224 * Note that depends_seqno is an in/out parameter and is updated on every
225 * step.
226 */
227 if (check_against<WSREP_KEY_EXCLUSIVE>
228 (found, key, key_type, trx, log_conflict, depends_seqno) ||
229 check_against<WSREP_KEY_UPDATE>
230 (found, key, key_type, trx, log_conflict, depends_seqno) ||
231 (key_type >= WSREP_KEY_UPDATE &&
232 /* exclusive keys must be checked against shared */
233 (check_against<WSREP_KEY_REFERENCE>
234 (found, key, key_type, trx, log_conflict, depends_seqno) ||
235 check_against<WSREP_KEY_SHARED>
236 (found, key, key_type, trx, log_conflict, depends_seqno))))
237 {
238 ret = true;
239 }
240
241 if (depends_seqno > trx->depends_seqno())
242 trx->set_depends_seqno(depends_seqno);
243
244 return ret;
245 }
246
247 /* returns true on collision, false otherwise */
248 static bool
certify_v3to5(galera::Certification::CertIndexNG & cert_index_ng,const galera::KeySet::KeyPart & key,galera::TrxHandleSlave * const trx,bool const store_keys,bool const log_conflicts)249 certify_v3to5(galera::Certification::CertIndexNG& cert_index_ng,
250 const galera::KeySet::KeyPart& key,
251 galera::TrxHandleSlave* const trx,
252 bool const store_keys,
253 bool const log_conflicts)
254 {
255 galera::KeyEntryNG ke(key);
256 galera::Certification::CertIndexNG::iterator ci(cert_index_ng.find(&ke));
257
258 if (cert_index_ng.end() == ci)
259 {
260 if (store_keys)
261 {
262 galera::KeyEntryNG* const kep(new galera::KeyEntryNG(ke));
263 ci = cert_index_ng.insert(kep).first;
264
265 cert_debug << "created new entry";
266 }
267 return false;
268 }
269 else
270 {
271 cert_debug << "found existing entry";
272
273 galera::KeyEntryNG* const kep(*ci);
274 // Note: For we skip certification for isolated trxs, only
275 // cert index and key_list is populated.
276 return (!trx->is_toi() &&
277 certify_and_depend_v3to5(kep, key, trx, log_conflicts));
278 }
279 }
280
281 // Add key to trx references for trx that passed certification.
282 //
283 // @param cert_index certification index in use
284 // @param trx certified transaction
285 // @param key_set key_set used in certification
286 // @param key_count number of keys in key set
do_ref_keys(galera::Certification::CertIndexNG & cert_index,galera::TrxHandleSlave * const trx,const galera::KeySetIn & key_set,const long key_count)287 static void do_ref_keys(galera::Certification::CertIndexNG& cert_index,
288 galera::TrxHandleSlave* const trx,
289 const galera::KeySetIn& key_set,
290 const long key_count)
291 {
292 for (long i(0); i < key_count; ++i)
293 {
294 const galera::KeySet::KeyPart& k(key_set.next());
295 galera::KeyEntryNG ke(k);
296 galera::Certification::CertIndexNG::const_iterator
297 ci(cert_index.find(&ke));
298
299 if (ci == cert_index.end())
300 {
301 gu_throw_fatal << "could not find key '" << k
302 << "' from cert index";
303 }
304 (*ci)->ref(k.wsrep_type(trx->version()), k, trx);
305 }
306 }
307
308 // Clean up keys from index that were added by trx that failed
309 // certification.
310 //
311 // @param cert_index certification inde
312 // @param key_set key_set used in certification
313 // @param processed number of keys that were processed in certification
do_clean_keys(galera::Certification::CertIndexNG & cert_index,const galera::TrxHandleSlave * const trx,const galera::KeySetIn & key_set,const long processed)314 static void do_clean_keys(galera::Certification::CertIndexNG& cert_index,
315 const galera::TrxHandleSlave* const trx,
316 const galera::KeySetIn& key_set,
317 const long processed)
318 {
319 /* 'strictly less' comparison is essential in the following loop:
320 * processed key failed cert and was not added to index */
321 for (long i(0); i < processed; ++i)
322 {
323 KeyEntryNG ke(key_set.next());
324
325 // Clean up cert index from entries which were added by this trx
326 galera::Certification::CertIndexNG::iterator ci(cert_index.find(&ke));
327
328 if (gu_likely(ci != cert_index.end()))
329 {
330 galera::KeyEntryNG* kep(*ci);
331
332 if (kep->referenced() == false)
333 {
334 // kel was added to cert_index_ by this trx -
335 // remove from cert_index_ and fall through to delete
336 cert_index.erase(ci);
337 }
338 else continue;
339
340 assert(kep->referenced() == false);
341
342 delete kep;
343 }
344 else if(ke.key().wsrep_type(trx->version()) == WSREP_KEY_SHARED)
345 {
346 assert(0); // we actually should never be here, the key should
347 // be either added to cert_index_ or be there already
348 log_warn << "could not find shared key '"
349 << ke.key() << "' from cert index";
350 }
351 else { /* non-shared keys can duplicate shared in the key set */ }
352 }
353 }
354
355 galera::Certification::TestResult
do_test_v3to5(TrxHandleSlave * trx,bool store_keys)356 galera::Certification::do_test_v3to5(TrxHandleSlave* trx, bool store_keys)
357 {
358 cert_debug << "BEGIN CERTIFICATION v" << trx->version() << ": " << *trx;
359
360 #ifndef NDEBUG
361 // to check that cleanup after cert failure returns cert_index
362 // to original size
363 size_t prev_cert_index_size(cert_index_ng_.size());
364 #endif // NDEBUG
365
366 const KeySetIn& key_set(trx->write_set().keyset());
367 long const key_count(key_set.count());
368 long processed(0);
369
370 key_set.rewind();
371
372 for (; processed < key_count; ++processed)
373 {
374 const KeySet::KeyPart& key(key_set.next());
375
376 if (certify_v3to5(cert_index_ng_, key, trx, store_keys, log_conflicts_))
377 {
378 trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_));
379 goto cert_fail;
380 }
381 }
382
383 trx->set_depends_seqno(std::max(trx->depends_seqno(), last_pa_unsafe_));
384
385 if (store_keys == true)
386 {
387 assert (key_count == processed);
388 key_set.rewind();
389 do_ref_keys(cert_index_ng_, trx, key_set, key_count);
390
391 if (trx->pa_unsafe()) last_pa_unsafe_ = trx->global_seqno();
392
393 key_count_ += key_count;
394 }
395 cert_debug << "END CERTIFICATION (success): " << *trx;
396 return TEST_OK;
397
398 cert_fail:
399
400 cert_debug << "END CERTIFICATION (failed): " << *trx;
401
402 assert (processed < key_count);
403
404 if (store_keys == true)
405 {
406 /* Clean up key entries allocated for this trx */
407 key_set.rewind();
408 do_clean_keys(cert_index_ng_, trx, key_set, processed);
409 assert(cert_index_ng_.size() == prev_cert_index_size);
410 }
411
412 return TEST_FAILED;
413 }
414
415 /* Determine whether a given trx can be correctly certified under the
416 * certification protocol currently established in the group (cert_version)
417 * Certification protocols from 1 to 3 could only handle writesets of the same
418 * version. Certification protocol 4 can handle writesets of both version 3
419 * and 4 */
420 static inline bool
trx_cert_version_match(int const trx_version,int const cert_version)421 trx_cert_version_match(int const trx_version, int const cert_version)
422 {
423 if (cert_version <= 3)
424 {
425 return (trx_version == cert_version);
426 }
427 else
428 {
429 return (trx_version >= 3 && trx_version <= cert_version);
430 }
431 }
432
433 galera::Certification::TestResult
do_test(const TrxHandleSlavePtr & trx,bool store_keys)434 galera::Certification::do_test(const TrxHandleSlavePtr& trx, bool store_keys)
435 {
436 assert(trx->source_id() != WSREP_UUID_UNDEFINED);
437
438 if (!trx_cert_version_match(trx->version(), version_))
439 {
440 log_warn << "trx protocol version: "
441 << trx->version()
442 << " does not match certification protocol version: "
443 << version_;
444 return TEST_FAILED;
445 }
446
447 // trx->is_certified() == true during index rebuild from IST, do_test()
448 // must not fail, just populate index
449 if (gu_unlikely(trx->certified() == false &&
450 (trx->last_seen_seqno() < initial_position_ ||
451 trx->global_seqno()-trx->last_seen_seqno() > max_length_)))
452 {
453 if (trx->global_seqno() - trx->last_seen_seqno() > max_length_)
454 {
455 log_warn << "certification interval for trx " << *trx
456 << " exceeds the limit of " << max_length_;
457 }
458
459 return TEST_FAILED;
460 }
461
462 TestResult res(TEST_FAILED);
463
464 gu::Lock lock(mutex_); // why do we need that? - e.g. set_trx_committed()
465
466 /* initialize parent seqno */
467 if (gu_unlikely(trx_map_.empty()))
468 {
469 trx->set_depends_seqno(trx->global_seqno() - 1);
470 }
471 else
472 {
473 if (optimistic_pa_ == false &&
474 trx->last_seen_seqno() > trx->depends_seqno())
475 trx->set_depends_seqno(trx->last_seen_seqno());
476
477 wsrep_seqno_t const ds(trx_map_.begin()->first - 1);
478 if (ds > trx->depends_seqno()) trx->set_depends_seqno(ds);
479 }
480
481 switch (version_)
482 {
483 case 1:
484 case 2:
485 break;
486 case 3:
487 case 4:
488 case 5:
489 res = do_test_v3to5(trx.get(), store_keys);
490 break;
491 default:
492 gu_throw_fatal << "certification test for version "
493 << version_ << " not implemented";
494 }
495
496 if (store_keys == true && res == TEST_OK)
497 {
498 ++trx_count_;
499 gu::Lock lock(stats_mutex_);
500 ++n_certified_;
501 deps_dist_ += (trx->global_seqno() - trx->depends_seqno());
502 cert_interval_ += (trx->global_seqno() - trx->last_seen_seqno() - 1);
503 index_size_ = cert_index_ng_.size();
504 }
505
506 // Additional NBO certification.
507 if (trx->flags() & TrxHandle::F_ISOLATION)
508 {
509 res = do_test_nbo(trx);
510 assert(TEST_FAILED == res || trx->depends_seqno() >= 0);
511 }
512
513 byte_count_ += trx->size();
514
515 return res;
516 }
517
518
519 galera::Certification::TestResult
do_test_preordered(TrxHandleSlave * trx)520 galera::Certification::do_test_preordered(TrxHandleSlave* trx)
521 {
522 /* Source ID is not always available for preordered events (e.g. event
523 * producer didn't provide any) so for now we must accept undefined IDs. */
524 //assert(trx->source_id() != WSREP_UUID_UNDEFINED);
525
526 assert(trx->version() >= 3);
527 assert(trx->preordered());
528
529 /* we don't want to go any further unless the writeset checksum is ok */
530 trx->verify_checksum(); // throws
531 /* if checksum failed we need to throw ASAP, let the caller catch it,
532 * flush monitors, save state and abort. */
533
534 /* This is a primitive certification test for preordered actions:
535 * it does not handle gaps and relies on general apply monitor for
536 * parallel applying. Ideally there should be a certification object
537 * per source. */
538
539 if (gu_unlikely(last_preordered_id_ &&
540 (last_preordered_id_ + 1 != trx->trx_id())))
541 {
542 log_warn << "Gap in preordered stream: source_id '" << trx->source_id()
543 << "', trx_id " << trx->trx_id() << ", previous id "
544 << last_preordered_id_;
545 assert(0);
546 }
547
548 trx->set_depends_seqno(last_preordered_seqno_ + 1 -
549 trx->write_set().pa_range());
550 // +1 compensates for subtracting from a previous seqno, rather than own.
551 trx->mark_certified();
552
553 last_preordered_seqno_ = trx->global_seqno();
554 last_preordered_id_ = trx->trx_id();
555
556 return TEST_OK;
557 }
558
559
560 //
561 // non-blocking operations
562 //
563
564 // Prepare a copy of TrxHandleSlave with private storage
copy_ts(galera::TrxHandleSlave * ts,galera::TrxHandleSlave::Pool & pool,gu::shared_ptr<NBOCtx>::type nbo_ctx)565 galera::NBOEntry copy_ts(
566 galera::TrxHandleSlave* ts,
567 galera::TrxHandleSlave::Pool& pool,
568 gu::shared_ptr<NBOCtx>::type nbo_ctx)
569 {
570 // FIXME: Pass proper working directory from config to MappedBuffer ctor
571 gu::shared_ptr<galera::MappedBuffer>::type buf(
572 new galera::MappedBuffer("/tmp"));
573 assert(ts->action().first && ts->action().second);
574 if (ts->action().first == 0)
575 {
576 gu_throw_fatal
577 << "Unassigned action pointer for transaction, cannot make a copy of: "
578 << *ts;
579 }
580
581 buf->resize(ts->action().second);
582 std::copy(static_cast<const gu::byte_t*>(ts->action().first),
583 static_cast<const gu::byte_t*>(ts->action().first)
584 + ts->action().second,
585 buf->begin());
586
587 galera::TrxHandleSlaveDeleter d;
588 gu::shared_ptr<galera::TrxHandleSlave>::type new_ts(
589 galera::TrxHandleSlave::New(ts->local(), pool), d);
590 if (buf->size() > size_t(std::numeric_limits<int32_t>::max()))
591 gu_throw_error(ERANGE) << "Buffer size " << buf->size()
592 << " out of range";
593 gcs_action act = {ts->global_seqno(), ts->local_seqno(),
594 &(*buf)[0], static_cast<int32_t>(buf->size()),
595 GCS_ACT_WRITESET};
596 if (ts->certified() == false)
597 {
598 // TrxHandleSlave is from group
599 gu_trace(new_ts->unserialize<true>(act));
600 }
601 else
602 {
603 // TrxHandleSlave is from IST
604 gu_trace(new_ts->unserialize<false>(act));
605 }
606 new_ts->set_local(ts->local());
607 return galera::NBOEntry(new_ts, buf, nbo_ctx);
608 }
609
610
purge_key_set_nbo(galera::Certification::CertIndexNBO & cert_index,bool is_nbo_index,galera::TrxHandleSlave * ts,const galera::KeySetIn & key_set,const long count)611 static void purge_key_set_nbo(galera::Certification::CertIndexNBO& cert_index,
612 bool is_nbo_index,
613 galera::TrxHandleSlave* ts,
614 const galera::KeySetIn& key_set,
615 const long count)
616 {
617 using galera::Certification;
618 using galera::KeyEntryNG;
619 using galera::KeySet;
620
621 key_set.rewind();
622
623 for (long i(0); i < count; ++i)
624 {
625 KeyEntryNG ke(key_set.next());
626 std::pair<Certification::CertIndexNBO::iterator,
627 Certification::CertIndexNBO::iterator>
628 ci_range(cert_index.equal_range(&ke));
629
630 assert(std::distance(ci_range.first, ci_range.second) >= 1);
631
632 wsrep_key_type_t const p(ke.key().wsrep_type(ts->version()));
633 Certification::CertIndexNBO::iterator ci;
634 for (ci = ci_range.first; ci != ci_range.second; ++ci)
635 {
636 if ((*ci)->ref_trx(p) == ts) break;
637 }
638 assert(ci != ci_range.second);
639 if (ci == ci_range.second)
640 {
641 log_warn << "purge_key_set_nbo(): Could not find key "
642 << ke.key() << " from NBO index, skipping";
643 continue;
644 }
645
646 KeyEntryNG* const kep(*ci);
647 assert(kep->referenced() == true);
648
649 kep->unref(p, ts);
650 assert(kep->referenced() == false);
651 cert_index.erase(ci);
652 delete kep;
653 }
654 }
655
656
end_nbo(galera::NBOMap::iterator i,galera::TrxHandleSlavePtr ts,galera::Certification::CertIndexNBO & nbo_index,galera::NBOMap & nbo_map)657 static void end_nbo(galera::NBOMap::iterator i,
658 galera::TrxHandleSlavePtr ts,
659 galera::Certification::CertIndexNBO& nbo_index,
660 galera::NBOMap& nbo_map)
661 {
662 NBOEntry& e(i->second);
663
664 log_debug << "Ending NBO started by " << *e.ts_ptr();
665
666 // Erase entry from index
667 const KeySetIn& key_set(e.ts_ptr()->write_set().keyset());
668 purge_key_set_nbo(nbo_index, true, e.ts_ptr(), key_set, key_set.count());
669
670 ts->set_ends_nbo(e.ts_ptr()->global_seqno());
671
672 nbo_map.erase(i);
673 }
674
675
nbo_ctx_unlocked(wsrep_seqno_t const seqno)676 gu::shared_ptr<NBOCtx>::type galera::Certification::nbo_ctx_unlocked(
677 wsrep_seqno_t const seqno)
678 {
679 // This will either
680 // * Insert a new NBOCtx shared_ptr into ctx map if one didn't exist
681 // before, or
682 // * Return existing entry, while newly created shared ptr gets freed
683 // automatically when it goes out of scope
684 return nbo_ctx_map_.insert(
685 std::make_pair(seqno,
686 gu::make_shared<NBOCtx>())).first->second;
687 }
688
nbo_ctx(wsrep_seqno_t const seqno)689 gu::shared_ptr<NBOCtx>::type galera::Certification::nbo_ctx(
690 wsrep_seqno_t const seqno)
691 {
692 assert(seqno > 0);
693 gu::Lock lock(mutex_);
694 return nbo_ctx_unlocked(seqno);
695 }
696
erase_nbo_ctx(wsrep_seqno_t const seqno)697 void galera::Certification::erase_nbo_ctx(wsrep_seqno_t const seqno)
698 {
699 assert(seqno > 0);
700 gu::Lock lock(mutex_);
701
702 size_t n_erased(nbo_ctx_map_.erase(seqno));
703 assert(n_erased == 1); (void)n_erased;
704 }
705
706
is_exclusive(const galera::KeyEntryNG * ke)707 static bool is_exclusive(const galera::KeyEntryNG* ke)
708 {
709 assert(ke != 0);
710 assert((ke->ref_trx(WSREP_KEY_SHARED) ||
711 ke->ref_trx(WSREP_KEY_REFERENCE) ||
712 ke->ref_trx(WSREP_KEY_UPDATE) ||
713 ke->ref_trx(WSREP_KEY_EXCLUSIVE)) &&
714 !(ke->ref_trx(WSREP_KEY_SHARED) &&
715 ke->ref_trx(WSREP_KEY_REFERENCE) &&
716 ke->ref_trx(WSREP_KEY_UPDATE) &&
717 ke->ref_trx(WSREP_KEY_EXCLUSIVE)));
718 return (ke->ref_trx(WSREP_KEY_EXCLUSIVE) != 0 ||
719 ke->ref_trx(WSREP_KEY_UPDATE) != 0);
720 }
721
722 static bool
certify_nbo(galera::Certification::CertIndexNBO & cert_index,const galera::KeySet::KeyPart & key,galera::TrxHandleSlave * const trx,bool const log_conflicts)723 certify_nbo(galera::Certification::CertIndexNBO& cert_index,
724 const galera::KeySet::KeyPart& key,
725 galera::TrxHandleSlave* const trx,
726 bool const log_conflicts)
727 {
728 using galera::KeyEntryNG;
729 using galera::Certification;
730 using galera::TrxHandleSlave;
731
732 KeyEntryNG ke(key);
733 std::pair<Certification::CertIndexNBO::iterator,
734 Certification::CertIndexNBO::iterator>
735 it(cert_index.equal_range(&ke));
736
737 // Certification is done over whole index as opposed to regular
738 // write set certification where only given range is used
739
740 // If found range is non-empty, it must be either single exclusive
741 // key or all shared.
742 assert(std::count_if(it.first, it.second, is_exclusive) == 0 ||
743 std::distance(it.first, it.second) == 1);
744
745 Certification::CertIndexNBO::iterator i;
746 if ((i = std::find_if(it.first, it.second, is_exclusive)) != cert_index.end())
747 {
748 if (gu_unlikely(log_conflicts == true))
749 {
750 const TrxHandleSlave* ref_trx((*i)->ref_trx(WSREP_KEY_EXCLUSIVE));
751 assert(ref_trx != 0);
752 log_info << "NBO conflict for key " << key << ": "
753 << *trx << " <--X--> " << *ref_trx;
754 }
755 return true;
756 }
757 return false;
758 }
759
760 static void
do_ref_keys_nbo(galera::Certification::CertIndexNBO & index,TrxHandleSlave * const trx,const galera::KeySetIn & key_set,const long key_count)761 do_ref_keys_nbo(galera::Certification::CertIndexNBO& index,
762 TrxHandleSlave* const trx,
763 const galera::KeySetIn& key_set,
764 const long key_count)
765 {
766 using galera::KeySet;
767 using galera::KeyEntryNG;
768 using galera::Certification;
769
770 key_set.rewind();
771
772 for (long i(0); i < key_count; ++i)
773 {
774 const KeySet::KeyPart& key(key_set.next());
775 wsrep_key_type_t const type(key.wsrep_type(trx->version()));
776 KeyEntryNG* kep (new KeyEntryNG(key));
777 Certification::CertIndexNBO::iterator it;
778 assert((it = index.find(kep)) == index.end() ||
779 (*it)->ref_trx(type) != trx);
780 it = index.insert(kep);
781 (*it)->ref(type, key, trx);
782 }
783 }
784
785
do_test_nbo(const TrxHandleSlavePtr & ts)786 galera::Certification::TestResult galera::Certification::do_test_nbo(
787 const TrxHandleSlavePtr& ts)
788 {
789 assert(!ts->is_dummy());
790 assert(ts->flags() & TrxHandle::F_ISOLATION);
791 assert(ts->flags() & (TrxHandle::F_BEGIN | TrxHandle::F_COMMIT));
792
793 if (nbo_position_ >= ts->global_seqno())
794 {
795 // This is part of cert index preload, needs to be dropped since
796 // it is already processed by this node before partitioning.
797 assert(ts->certified() == true);
798 // Return TEST_OK. If the trx is in index preload, it has
799 // passed certification on donor.
800 log_debug << "Dropping NBO " << *ts;
801 return TEST_OK;
802 }
803 nbo_position_ = ts->global_seqno();
804
805 #ifndef NDEBUG
806 size_t prev_nbo_index_size(nbo_index_.size());
807 #endif // NDEBUG
808
809 bool ineffective(false);
810
811 galera::Certification::TestResult ret(TEST_OK);
812 if ((ts->flags() & TrxHandle::F_BEGIN) &&
813 (ts->flags() & TrxHandle::F_COMMIT))
814 {
815 // Old school atomic TOI
816 log_debug << "TOI: " << *ts;
817 const KeySetIn& key_set(ts->write_set().keyset());
818 long const key_count(key_set.count());
819 long processed(0);
820
821 key_set.rewind();
822 for (; processed < key_count; ++processed)
823 {
824 const KeySet::KeyPart& key(key_set.next());
825 if (certify_nbo(nbo_index_, key, ts.get(), log_conflicts_))
826 {
827 ret = TEST_FAILED;
828 break;
829 }
830 }
831 log_debug << "NBO test result " << ret << " for TOI " << *ts;
832 // Atomic TOI should not cause change in NBO index
833 assert(prev_nbo_index_size == nbo_index_.size());
834 }
835 else if (ts->flags() & TrxHandle::F_BEGIN)
836 {
837 // Beginning of non-blocking operation
838 log_debug << "NBO start: " << *ts;
839 // We need a copy of ts since the lifetime of NBO may exceed
840 // the lifetime of the buffer in GCache
841 NBOEntry entry(copy_ts(ts.get(), nbo_pool_, nbo_ctx_unlocked(
842 ts->global_seqno())));
843
844 TrxHandleSlave* new_ts(entry.ts_ptr());
845 const KeySetIn& key_set(new_ts->write_set().keyset());
846 long const key_count(key_set.count());
847 long processed(0);
848
849 key_set.rewind();
850 for (; processed < key_count; ++processed)
851 {
852 const KeySet::KeyPart& key(key_set.next());
853 if (certify_nbo(nbo_index_, key, new_ts, log_conflicts_))
854 {
855 ret = TEST_FAILED;
856 break;
857 }
858 }
859
860 switch (ret)
861 {
862 case TEST_OK:
863 do_ref_keys_nbo(nbo_index_, new_ts, key_set, key_count);
864 nbo_map_.insert(std::make_pair(new_ts->global_seqno(),
865 entry));
866 break;
867 case TEST_FAILED:
868 // Clean keys not needed here since certify_nbo()
869 // does not insert them into nbo_index_
870 break;
871 }
872 }
873 else
874 {
875 assert(ts->nbo_end());
876 // End of non-blocking operation
877 log_debug << "NBO end: " << *ts;
878 ineffective = true;
879
880 NBOKey key;
881 const DataSetIn& ws(ts->write_set().dataset());
882 ws.rewind();
883 assert(ws.count() == 1);
884 if (ws.count() != 1) gu_throw_fatal << "Invalid dataset count in "
885 << *ts;
886 gu::Buf buf(ws.next());
887 key.unserialize(static_cast<const gu::byte_t*>(buf.ptr), buf.size, 0);
888
889 NBOMap::iterator i(nbo_map_.find(key));
890 if (i != nbo_map_.end())
891 {
892 NBOEntry& e(i->second);
893 e.add_ended(ts->source_id());
894 if (ts->local() == true)
895 {
896 // Clear NBO context aborted flag if it is set by
897 // intermediate view change.
898 e.nbo_ctx()->set_aborted(false);
899 }
900
901 if (current_view_.subset_of(e.ended_set()))
902 {
903 // All nodes of the current primary view have
904 // ended the operation.
905 end_nbo(i, ts, nbo_index_, nbo_map_);
906 ineffective = false;
907 }
908 }
909 else
910 {
911 log_warn << "no corresponding NBO begin found for NBO end " << *ts;
912 }
913 }
914
915 if (gu_likely(TEST_OK == ret))
916 {
917 ts->set_depends_seqno(ts->global_seqno() - 1);
918 if (gu_unlikely(ineffective))
919 {
920 assert(ts->nbo_end());
921 assert(ts->ends_nbo() == WSREP_SEQNO_UNDEFINED);
922 ret = TEST_FAILED;
923 }
924 }
925
926 assert(TEST_FAILED == ret || ts->depends_seqno() >= 0);
927
928 return ret;
929 }
930
Certification(gu::Config & conf,ServiceThd * thd)931 galera::Certification::Certification(gu::Config& conf, ServiceThd* thd)
932 :
933 version_ (-1),
934 conf_ (conf),
935 trx_map_ (),
936 cert_index_ng_ (),
937 nbo_map_ (),
938 nbo_ctx_map_ (),
939 nbo_index_ (),
940 nbo_pool_ (sizeof(TrxHandleSlave)),
941 deps_set_ (),
942 current_view_ (),
943 service_thd_ (thd),
944 mutex_ (),
945 trx_size_warn_count_ (0),
946 initial_position_ (-1),
947 position_ (-1),
948 nbo_position_ (-1),
949 safe_to_discard_seqno_ (-1),
950 last_pa_unsafe_ (-1),
951 last_preordered_seqno_ (position_),
952 last_preordered_id_ (0),
953 stats_mutex_ (),
954 n_certified_ (0),
955 deps_dist_ (0),
956 cert_interval_ (0),
957 index_size_ (0),
958 key_count_ (0),
959 byte_count_ (0),
960 trx_count_ (0),
961
962 max_length_ (max_length(conf)),
963 max_length_check_ (length_check(conf)),
964 inconsistent_ (false),
965 log_conflicts_ (conf.get<bool>(CERT_PARAM_LOG_CONFLICTS)),
966 optimistic_pa_ (conf.get<bool>(CERT_PARAM_OPTIMISTIC_PA))
967 {}
968
969
~Certification()970 galera::Certification::~Certification()
971 {
972 log_info << "cert index usage at exit " << cert_index_ng_.size();
973 log_info << "cert trx map usage at exit " << trx_map_.size();
974 log_info << "deps set usage at exit " << deps_set_.size();
975
976 double avg_cert_interval(0);
977 double avg_deps_dist(0);
978 size_t index_size(0);
979 stats_get(avg_cert_interval, avg_deps_dist, index_size);
980 log_info << "avg deps dist " << avg_deps_dist;
981 log_info << "avg cert interval " << avg_cert_interval;
982 log_info << "cert index size " << index_size;
983
984 gu::Lock lock(mutex_);
985
986 for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
987 trx_map_.clear();
988 nbo_map_.clear();
989 std::for_each(nbo_index_.begin(), nbo_index_.end(),
990 [](CertIndexNBO::value_type key_entry)
991 {
992 for (int i(0); i <= KeySet::Key::TYPE_MAX; ++i)
993 {
994 wsrep_key_type_t key_type(static_cast<wsrep_key_type_t>(i));
995 const TrxHandleSlave* ts(key_entry->ref_trx(key_type));
996 if (ts)
997 {
998 key_entry->unref(key_type, ts);
999 }
1000 }
1001 delete key_entry;
1002 });
1003 if (service_thd_)
1004 {
1005 service_thd_->release_seqno(position_);
1006 service_thd_->flush(gu::UUID());
1007 }
1008 }
1009
1010
assign_initial_position(const gu::GTID & gtid,int const version)1011 void galera::Certification::assign_initial_position(const gu::GTID& gtid,
1012 int const version)
1013 {
1014 assert(gtid.seqno() >= 0 || gtid == gu::GTID());
1015 switch (version)
1016 {
1017 // value -1 used in initialization when trx protocol version is not
1018 // available
1019 case -1:
1020 case 1:
1021 case 2:
1022 case 3:
1023 case 4:
1024 case 5:
1025 break;
1026 default:
1027 gu_throw_fatal << "certification/trx version "
1028 << version << " not supported";
1029 }
1030
1031 wsrep_seqno_t const seqno(gtid.seqno());
1032 gu::Lock lock(mutex_);
1033
1034 std::for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
1035
1036 if (seqno >= position_)
1037 {
1038 assert(cert_index_ng_.size() == 0);
1039 }
1040 else
1041 {
1042 if (seqno > 0) // don't warn on index reset.
1043 {
1044 log_warn << "moving position backwards: " << position_ << " -> "
1045 << seqno;
1046 }
1047
1048 std::for_each(cert_index_ng_.begin(), cert_index_ng_.end(),
1049 gu::DeleteObject());
1050 cert_index_ng_.clear();
1051 }
1052
1053 trx_map_.clear();
1054 assert(cert_index_ng_.empty());
1055
1056 if (service_thd_)
1057 {
1058 service_thd_->release_seqno(position_);
1059 service_thd_->flush(gtid.uuid());
1060 }
1061
1062 log_info << "####### Assign initial position for certification: " << gtid
1063 << ", protocol version: " << version;
1064
1065 initial_position_ = seqno;
1066 position_ = seqno;
1067 safe_to_discard_seqno_ = seqno;
1068 last_pa_unsafe_ = seqno;
1069 last_preordered_seqno_ = position_;
1070 last_preordered_id_ = 0;
1071 version_ = version;
1072 }
1073
1074
1075 void
adjust_position(const View & view,const gu::GTID & gtid,int const version)1076 galera::Certification::adjust_position(const View& view,
1077 const gu::GTID& gtid,
1078 int const version)
1079 {
1080 assert(gtid.uuid() != GU_UUID_NIL);
1081 assert(gtid.seqno() >= 0);
1082
1083 gu::Lock lock(mutex_);
1084
1085 // this assert is too strong: local ordered transactions may get canceled without
1086 // entering certification assert(position_ + 1 == seqno || 0 == position_);
1087
1088 log_info << "####### Adjusting cert position: "
1089 << position_ << " -> " << gtid.seqno();
1090
1091 if (version != version_)
1092 {
1093 std::for_each(trx_map_.begin(), trx_map_.end(), PurgeAndDiscard(*this));
1094 assert(trx_map_.empty() || trx_map_.rbegin()->first + 1 == position_);
1095 trx_map_.clear();
1096 assert(cert_index_ng_.empty());
1097 if (service_thd_)
1098 {
1099 service_thd_->release_seqno(position_);
1100 }
1101 }
1102
1103 if (service_thd_)
1104 {
1105 service_thd_->flush(gtid.uuid());
1106 }
1107
1108 position_ = gtid.seqno();
1109 last_pa_unsafe_ = position_;
1110 version_ = version;
1111 current_view_ = view;
1112
1113 // Loop over NBO entries, clear state and abort waiters. NBO end waiters
1114 // must resend end messages.
1115 for (NBOMap::iterator i(nbo_map_.begin()); i != nbo_map_.end(); ++i)
1116 {
1117 NBOEntry& e(i->second);
1118 e.clear_ended();
1119 e.nbo_ctx()->set_aborted(true);
1120 }
1121 }
1122
1123 wsrep_seqno_t
increment_position()1124 galera::Certification::increment_position()
1125 {
1126 gu::Lock lock(mutex_);
1127 position_++;
1128 return position_;
1129 }
1130
1131 galera::Certification::TestResult
test(const TrxHandleSlavePtr & trx,bool store_keys)1132 galera::Certification::test(const TrxHandleSlavePtr& trx, bool store_keys)
1133 {
1134 assert(trx->global_seqno() >= 0 /* && trx->local_seqno() >= 0 */);
1135
1136 const TestResult ret
1137 (trx->preordered() ?
1138 do_test_preordered(trx.get()) : do_test(trx, store_keys));
1139
1140 if (gu_unlikely(ret != TEST_OK)) { trx->mark_dummy(); }
1141
1142 return ret;
1143 }
1144
1145
get_safe_to_discard_seqno_() const1146 wsrep_seqno_t galera::Certification::get_safe_to_discard_seqno_() const
1147 {
1148 wsrep_seqno_t retval;
1149 if (deps_set_.empty() == true)
1150 {
1151 retval = safe_to_discard_seqno_;
1152 }
1153 else
1154 {
1155 retval = (*deps_set_.begin()) - 1;
1156 }
1157 return retval;
1158 }
1159
1160
1161 wsrep_seqno_t
purge_trxs_upto_(wsrep_seqno_t const seqno,bool const handle_gcache)1162 galera::Certification::purge_trxs_upto_(wsrep_seqno_t const seqno,
1163 bool const handle_gcache)
1164 {
1165 assert (seqno > 0);
1166
1167 TrxMap::iterator purge_bound(trx_map_.upper_bound(seqno));
1168
1169 cert_debug << "purging index up to " << seqno;
1170
1171 for_each(trx_map_.begin(), purge_bound, PurgeAndDiscard(*this));
1172 trx_map_.erase(trx_map_.begin(), purge_bound);
1173
1174 if (handle_gcache && service_thd_) service_thd_->release_seqno(seqno);
1175
1176 if (0 == ((trx_map_.size() + 1) % 10000))
1177 {
1178 log_debug << "trx map after purge: length: " << trx_map_.size()
1179 << ", requested purge seqno: " << seqno
1180 << ", real purge seqno: " << trx_map_.begin()->first - 1;
1181 }
1182
1183 return seqno;
1184 }
1185
1186
1187 galera::Certification::TestResult
append_trx(const TrxHandleSlavePtr & trx)1188 galera::Certification::append_trx(const TrxHandleSlavePtr& trx)
1189 {
1190 // explicit ROLLBACK is dummy() assert(!trx->is_dummy());
1191 assert(trx->global_seqno() >= 0 /* && trx->local_seqno() >= 0 */);
1192 assert(trx->global_seqno() > position_);
1193
1194 #ifndef NDEBUG
1195 bool const explicit_rollback(trx->explicit_rollback());
1196 #endif /* NDEBUG */
1197
1198 {
1199 gu::Lock lock(mutex_);
1200
1201 if (gu_unlikely(trx->global_seqno() != position_ + 1))
1202 {
1203 // this is perfectly normal if trx is rolled back just after
1204 // replication, keeping the log though
1205 log_debug << "seqno gap, position: " << position_
1206 << " trx seqno " << trx->global_seqno();
1207 }
1208
1209 if (gu_unlikely((trx->last_seen_seqno() + 1) < trx_map_.begin()->first))
1210 {
1211 /* See #733 - for now it is false positive */
1212 cert_debug
1213 << "WARNING: last_seen_seqno is below certification index: "
1214 << trx_map_.begin()->first << " > " << trx->last_seen_seqno();
1215 }
1216
1217 position_ = trx->global_seqno();
1218
1219 if (gu_unlikely(!(position_ & max_length_check_) &&
1220 (trx_map_.size() > static_cast<size_t>(max_length_))))
1221 {
1222 log_debug << "trx map size: " << trx_map_.size()
1223 << " - check if status.last_committed is incrementing";
1224
1225 wsrep_seqno_t trim_seqno(position_ - max_length_);
1226 wsrep_seqno_t const stds (get_safe_to_discard_seqno_());
1227
1228 if (trim_seqno > stds)
1229 {
1230 log_warn << "Attempt to trim certification index at "
1231 << trim_seqno << ", above safe-to-discard: " << stds;
1232 trim_seqno = stds;
1233 }
1234 else
1235 {
1236 cert_debug << "purging index up to " << trim_seqno;
1237 }
1238
1239 purge_trxs_upto_(trim_seqno, true);
1240 }
1241 }
1242
1243 const TestResult retval(test(trx, true));
1244
1245 {
1246 assert(trx->global_seqno() > 0);
1247
1248 gu::Lock lock(mutex_);
1249 if (trx_map_.insert(
1250 std::make_pair(trx->global_seqno(), trx)).second == false)
1251 gu_throw_fatal << "duplicate trx entry " << *trx;
1252
1253 // trx with local seqno WSREP_SEQNO_UNDEFINED originates from
1254 // IST so deps set tracking should not be done
1255 if (trx->local_seqno() != WSREP_SEQNO_UNDEFINED)
1256 {
1257 assert(trx->last_seen_seqno() != WSREP_SEQNO_UNDEFINED);
1258 deps_set_.insert(trx->last_seen_seqno());
1259 assert(deps_set_.size() <= trx_map_.size());
1260 }
1261 }
1262
1263 if (!trx->certified()) trx->mark_certified();
1264
1265 #ifndef NDEBUG
1266 if (explicit_rollback)
1267 {
1268 assert(trx->explicit_rollback());
1269 assert(retval == TEST_OK);
1270 assert(trx->state() == TrxHandle::S_CERTIFYING);
1271 }
1272 #endif /* NDEBUG */
1273
1274 return retval;
1275 }
1276
1277
set_trx_committed(TrxHandleSlave & trx)1278 wsrep_seqno_t galera::Certification::set_trx_committed(TrxHandleSlave& trx)
1279 {
1280 assert(trx.global_seqno() >= 0);
1281 assert(trx.is_committed() == false);
1282
1283 wsrep_seqno_t ret(WSREP_SEQNO_UNDEFINED);
1284 {
1285 gu::Lock lock(mutex_);
1286
1287 // certified trx with local seqno WSREP_SEQNO_UNDEFINED originates from
1288 // IST so deps set tracking should not be done
1289 // Local explicit rollback events bypassed certificaiton
1290 if (trx.certified() == true &&
1291 trx.local_seqno() != WSREP_SEQNO_UNDEFINED &&
1292 !trx.cert_bypass())
1293 {
1294 assert(trx.last_seen_seqno() != WSREP_SEQNO_UNDEFINED);
1295 DepsSet::iterator i(deps_set_.find(trx.last_seen_seqno()));
1296 assert(i != deps_set_.end());
1297
1298 if (deps_set_.size() == 1) safe_to_discard_seqno_ = *i;
1299
1300 deps_set_.erase(i);
1301 }
1302
1303 if (gu_unlikely(index_purge_required()))
1304 {
1305 ret = get_safe_to_discard_seqno_();
1306 }
1307 }
1308
1309 trx.mark_committed();
1310
1311 return ret;
1312 }
1313
1314 void
set_boolean_parameter(bool & param,const std::string & value,const std::string & param_name,const std::string & change_msg)1315 set_boolean_parameter(bool& param,
1316 const std::string& value,
1317 const std::string& param_name,
1318 const std::string& change_msg)
1319 {
1320 try
1321 {
1322 bool const old(param);
1323 param = gu::Config::from_config<bool>(value);
1324 if (old != param)
1325 {
1326 log_info << (param ? "Enabled " : "Disabled ") << change_msg;
1327 }
1328 }
1329 catch (gu::NotFound& e)
1330 {
1331 gu_throw_error(EINVAL) << "Bad value '" << value
1332 << "' for boolean parameter '"
1333 << param_name << '\'';
1334 }
1335 }
1336
1337 void
param_set(const std::string & key,const std::string & value)1338 galera::Certification::param_set(const std::string& key,
1339 const std::string& value)
1340 {
1341 if (key == Certification::PARAM_LOG_CONFLICTS)
1342 {
1343 set_boolean_parameter(log_conflicts_, value, CERT_PARAM_LOG_CONFLICTS,
1344 "logging of certification conflicts.");
1345 }
1346 else if (key == Certification::PARAM_OPTIMISTIC_PA)
1347 {
1348 set_boolean_parameter(optimistic_pa_, value, CERT_PARAM_OPTIMISTIC_PA,
1349 "\"optimistic\" parallel applying.");
1350 }
1351 else
1352 {
1353 throw gu::NotFound();
1354 }
1355
1356 conf_.set(key, value);
1357 }
1358
1359 void
mark_inconsistent()1360 galera::Certification::mark_inconsistent()
1361 {
1362 gu::Lock lock(mutex_);
1363 assert(!inconsistent_);
1364 inconsistent_ = true;
1365 }
1366