1 /*
2 * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
3 *
4 * This file is part of wsrep-lib.
5 *
6 * Wsrep-lib is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * Wsrep-lib is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with wsrep-lib. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20 #include "wsrep/client_state.hpp"
21 #include "wsrep/compiler.hpp"
22 #include "wsrep/logger.hpp"
23
24 #include <unistd.h> // usleep()
25 #include <sstream>
26 #include <iostream>
27
provider() const28 wsrep::provider& wsrep::client_state::provider() const
29 {
30 return server_state_.provider();
31 }
32
open(wsrep::client_id id)33 void wsrep::client_state::open(wsrep::client_id id)
34 {
35 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
36 assert(state_ == s_none);
37 assert(keep_command_error_ == false);
38 debug_log_state("open: enter");
39 owning_thread_id_ = wsrep::this_thread::get_id();
40 rollbacker_active_ = false;
41 sync_wait_gtid_ = wsrep::gtid::undefined();
42 last_written_gtid_ = wsrep::gtid::undefined();
43 state(lock, s_idle);
44 id_ = id;
45 debug_log_state("open: leave");
46 }
47
close()48 void wsrep::client_state::close()
49 {
50 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
51 debug_log_state("close: enter");
52
53 while (mode_ == m_local && is_rollbacker_active()) {
54 cond_.wait(lock);
55 }
56 do_acquire_ownership(lock);
57
58 state(lock, s_quitting);
59 keep_command_error_ = false;
60 lock.unlock();
61 if (transaction_.active() &&
62 (mode_ != m_local ||
63 transaction_.state() != wsrep::transaction::s_prepared))
64 {
65 client_service_.bf_rollback();
66 transaction_.after_statement();
67 }
68 if (mode_ == m_local)
69 {
70 disable_streaming();
71 }
72 debug_log_state("close: leave");
73 }
74
cleanup()75 void wsrep::client_state::cleanup()
76 {
77 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
78 cleanup(lock);
79 }
80
cleanup(wsrep::unique_lock<wsrep::mutex> & lock)81 void wsrep::client_state::cleanup(wsrep::unique_lock<wsrep::mutex>& lock)
82 {
83 debug_log_state("cleanup: enter");
84 state(lock, s_none);
85 debug_log_state("cleanup: leave");
86 }
87
override_error(enum wsrep::client_error error,enum wsrep::provider::status status)88 void wsrep::client_state::override_error(enum wsrep::client_error error,
89 enum wsrep::provider::status status)
90 {
91 assert(wsrep::this_thread::get_id() == owning_thread_id_);
92 // Error state should not be cleared with success code without
93 // explicit reset_error() call.
94 assert(current_error_ == wsrep::e_success ||
95 error != wsrep::e_success);
96 current_error_ = error;
97 current_error_status_ = status;
98 }
99
before_command(bool keep_command_error)100 int wsrep::client_state::before_command(bool keep_command_error)
101 {
102 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
103 debug_log_state("before_command: enter");
104 // If the state is s_exec, the processing thread has already grabbed
105 // control with wait_rollback_complete_and_acquire_ownership()
106 if (state_ != s_exec)
107 {
108 assert(state_ == s_idle);
109 do_wait_rollback_complete_and_acquire_ownership(lock);
110 assert(state_ == s_exec);
111 client_service_.store_globals();
112 }
113 else
114 {
115 // This thread must have acquired control by other means,
116 // for example via wait_rollback_complete_and_acquire_ownership().
117 assert(wsrep::this_thread::get_id() == owning_thread_id_);
118 }
119
120 keep_command_error_ = keep_command_error;
121
122 // If the transaction is active, it must be either executing,
123 // aborted as rolled back by rollbacker, or must_abort if the
124 // client thread gained control via
125 // wait_rollback_complete_and_acquire_ownership()
126 // just before BF abort happened.
127 assert(transaction_.active() == false ||
128 (transaction_.state() == wsrep::transaction::s_executing ||
129 transaction_.state() == wsrep::transaction::s_prepared ||
130 transaction_.state() == wsrep::transaction::s_aborted ||
131 transaction_.state() == wsrep::transaction::s_must_abort));
132
133 if (transaction_.active())
134 {
135 if (transaction_.state() == wsrep::transaction::s_must_abort ||
136 transaction_.state() == wsrep::transaction::s_aborted)
137 {
138 if (transaction_.is_xa())
139 {
140 // Client will rollback explicitly, return error.
141 debug_log_state("before_command: error");
142 return 1;
143 }
144
145 override_error(wsrep::e_deadlock_error);
146 if (transaction_.state() == wsrep::transaction::s_must_abort)
147 {
148 lock.unlock();
149 client_service_.bf_rollback();
150 lock.lock();
151
152 }
153
154 if (keep_command_error_)
155 {
156 // Keep the error for the next command
157 debug_log_state("before_command: keep error");
158 return 0;
159 }
160
161 // Clean up the transaction and return error.
162 lock.unlock();
163 (void)transaction_.after_statement();
164 lock.lock();
165
166 assert(transaction_.active() == false);
167 assert(transaction_.state() == wsrep::transaction::s_aborted);
168 assert(current_error() != wsrep::e_success);
169
170 debug_log_state("before_command: error");
171 return 1;
172 }
173 }
174 debug_log_state("before_command: success");
175 return 0;
176 }
177
after_command_before_result()178 void wsrep::client_state::after_command_before_result()
179 {
180 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
181 debug_log_state("after_command_before_result: enter");
182 assert(state() == s_exec);
183 if (transaction_.active() &&
184 transaction_.state() == wsrep::transaction::s_must_abort)
185 {
186 transaction_.after_command_must_abort(lock);
187 // If keep current error is set, the result will be propagated
188 // back to client with some future command, so keep the transaction
189 // open here so that error handling can happen in before_command()
190 // hook.
191 if (not keep_command_error_)
192 {
193 lock.unlock();
194 (void)transaction_.after_statement();
195 lock.lock();
196 }
197
198 assert(transaction_.state() == wsrep::transaction::s_aborted);
199 }
200 state(lock, s_result);
201 debug_log_state("after_command_before_result: leave");
202 }
203
after_command_after_result()204 void wsrep::client_state::after_command_after_result()
205 {
206 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
207 debug_log_state("after_command_after_result_enter");
208 assert(state() == s_result);
209 assert(transaction_.state() != wsrep::transaction::s_aborting);
210 if (transaction_.active() &&
211 transaction_.state() == wsrep::transaction::s_must_abort)
212 {
213 transaction_.after_command_must_abort(lock);
214 assert(transaction_.state() == wsrep::transaction::s_aborted);
215 }
216 else if (transaction_.active() == false && not keep_command_error_)
217 {
218 current_error_ = wsrep::e_success;
219 current_error_status_ = wsrep::provider::success;
220 }
221 keep_command_error_ = false;
222 sync_wait_gtid_ = wsrep::gtid::undefined();
223 state(lock, s_idle);
224 debug_log_state("after_command_after_result: leave");
225 }
226
before_statement()227 int wsrep::client_state::before_statement()
228 {
229 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
230 debug_log_state("before_statement: enter");
231 #if 0
232 /**
233 * @todo It might be beneficial to implement timed wait for
234 * server synced state.
235 */
236 if (allow_dirty_reads_ == false &&
237 server_state_.state() != wsrep::server_state::s_synced)
238 {
239 return 1;
240 }
241 #endif // 0
242
243 if (transaction_.active() &&
244 transaction_.state() == wsrep::transaction::s_must_abort)
245 {
246 // Rollback and cleanup will happen in after_command_before_result()
247 debug_log_state("before_statement_error");
248 return 1;
249 }
250 debug_log_state("before_statement: success");
251 return 0;
252 }
253
after_statement()254 int wsrep::client_state::after_statement()
255 {
256 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
257 debug_log_state("after_statement: enter");
258 assert(state() == s_exec);
259 assert(mode() == m_local);
260
261 if (transaction_.active() &&
262 transaction_.state() == wsrep::transaction::s_must_abort)
263 {
264 lock.unlock();
265 client_service_.bf_rollback();
266 lock.lock();
267 assert(transaction_.state() == wsrep::transaction::s_aborted);
268 // Error may be set already. For example, if fragment size
269 // exceeded the maximum size in certify_fragment(), then
270 // we already have wsrep::e_error_during_commit
271 if (current_error() == wsrep::e_success)
272 {
273 override_error(wsrep::e_deadlock_error);
274 }
275 }
276 lock.unlock();
277
278 (void)transaction_.after_statement();
279 if (current_error() == wsrep::e_deadlock_error)
280 {
281 if (mode_ == m_local)
282 {
283 debug_log_state("after_statement: may_retry");
284 return 1;
285 }
286 else
287 {
288 debug_log_state("after_statement: error");
289 return 1;
290 }
291 }
292 debug_log_state("after_statement: success");
293 return 0;
294 }
295
296 //////////////////////////////////////////////////////////////////////////////
297 // Rollbacker synchronization //
298 //////////////////////////////////////////////////////////////////////////////
299
sync_rollback_complete()300 void wsrep::client_state::sync_rollback_complete()
301 {
302 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
303 debug_log_state("sync_rollback_complete: enter");
304 assert(state_ == s_idle && mode_ == m_local &&
305 transaction_.state() == wsrep::transaction::s_aborted);
306 set_rollbacker_active(false);
307 cond_.notify_all();
308 debug_log_state("sync_rollback_complete: leave");
309 }
310
wait_rollback_complete_and_acquire_ownership()311 void wsrep::client_state::wait_rollback_complete_and_acquire_ownership()
312 {
313 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
314 debug_log_state("wait_rollback_complete_and_acquire_ownership: enter");
315 if (state_ == s_idle)
316 {
317 do_wait_rollback_complete_and_acquire_ownership(lock);
318 }
319 assert(state_ == s_exec);
320 debug_log_state("wait_rollback_complete_and_acquire_ownership: leave");
321 }
322
323 //////////////////////////////////////////////////////////////////////////////
324 // Streaming //
325 //////////////////////////////////////////////////////////////////////////////
326
streaming_params(enum wsrep::streaming_context::fragment_unit fragment_unit,size_t fragment_size)327 void wsrep::client_state::streaming_params(
328 enum wsrep::streaming_context::fragment_unit fragment_unit,
329 size_t fragment_size)
330 {
331 assert(mode_ == m_local);
332 transaction_.streaming_context().params(fragment_unit, fragment_size);
333 }
334
enable_streaming(enum wsrep::streaming_context::fragment_unit fragment_unit,size_t fragment_size)335 int wsrep::client_state::enable_streaming(
336 enum wsrep::streaming_context::fragment_unit
337 fragment_unit,
338 size_t fragment_size)
339 {
340 assert(mode_ == m_local);
341 if (transaction_.is_streaming() &&
342 transaction_.streaming_context().fragment_unit() !=
343 fragment_unit)
344 {
345 wsrep::log_error()
346 << "Changing fragment unit for active streaming transaction "
347 << "not allowed";
348 return 1;
349 }
350 transaction_.streaming_context().enable(fragment_unit, fragment_size);
351 return 0;
352 }
353
disable_streaming()354 void wsrep::client_state::disable_streaming()
355 {
356 assert(mode_ == m_local);
357 assert(state_ == s_exec || state_ == s_quitting);
358 transaction_.streaming_context().disable();
359 }
360
361 //////////////////////////////////////////////////////////////////////////////
362 // TOI //
363 //////////////////////////////////////////////////////////////////////////////
364
365 enum wsrep::provider::status
poll_enter_toi(wsrep::unique_lock<wsrep::mutex> & lock,const wsrep::key_array & keys,const wsrep::const_buffer & buffer,wsrep::ws_meta & meta,int flags,std::chrono::time_point<wsrep::clock> wait_until,bool & timed_out)366 wsrep::client_state::poll_enter_toi(
367 wsrep::unique_lock<wsrep::mutex>& lock,
368 const wsrep::key_array& keys,
369 const wsrep::const_buffer& buffer,
370 wsrep::ws_meta& meta,
371 int flags,
372 std::chrono::time_point<wsrep::clock> wait_until,
373 bool& timed_out)
374 {
375 WSREP_LOG_DEBUG(debug_log_level(),
376 wsrep::log::debug_level_client_state,
377 "poll_enter_toi: "
378 << flags
379 << ","
380 << wait_until.time_since_epoch().count());
381 enum wsrep::provider::status status;
382 timed_out = false;
383 wsrep::ws_meta poll_meta; // tmp var for polling, as enter_toi may clear meta arg on errors
384 do
385 {
386 lock.unlock();
387 poll_meta = meta;
388 status = provider().enter_toi(id_, keys, buffer, poll_meta, flags);
389 if (status != wsrep::provider::success &&
390 not poll_meta.gtid().is_undefined())
391 {
392 // Successfully entered TOI, but the provider reported failure.
393 // This may happen for example if certification fails.
394 // Leave TOI before proceeding.
395 if (provider().leave_toi(id_, wsrep::mutable_buffer()))
396 {
397 wsrep::log_warning()
398 << "Failed to leave TOI after failure in "
399 << "poll_enter_toi()";
400 }
401 poll_meta = wsrep::ws_meta();
402 }
403 if (status == wsrep::provider::error_certification_failed ||
404 status == wsrep::provider::error_connection_failed)
405 {
406 ::usleep(300000);
407 }
408 lock.lock();
409 timed_out = !(wait_until.time_since_epoch().count() &&
410 wsrep::clock::now() < wait_until);
411 }
412 while ((status == wsrep::provider::error_certification_failed ||
413 status == wsrep::provider::error_connection_failed) &&
414 not timed_out &&
415 not client_service_.interrupted(lock));
416 meta = poll_meta;
417 return status;
418 }
419
enter_toi_common(wsrep::unique_lock<wsrep::mutex> & lock)420 void wsrep::client_state::enter_toi_common(
421 wsrep::unique_lock<wsrep::mutex>& lock)
422 {
423 assert(lock.owns_lock());
424 toi_mode_ = mode_;
425 mode(lock, m_toi);
426 }
427
enter_toi_local(const wsrep::key_array & keys,const wsrep::const_buffer & buffer,std::chrono::time_point<wsrep::clock> wait_until)428 int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys,
429 const wsrep::const_buffer& buffer,
430 std::chrono::time_point<wsrep::clock> wait_until)
431 {
432 debug_log_state("enter_toi_local: enter");
433 assert(state_ == s_exec);
434 assert(mode_ == m_local);
435 int ret;
436
437 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
438
439 bool timed_out;
440 auto const status(poll_enter_toi(
441 lock, keys, buffer,
442 toi_meta_,
443 wsrep::provider::flag::start_transaction |
444 wsrep::provider::flag::commit,
445 wait_until,
446 timed_out));
447 switch (status)
448 {
449 case wsrep::provider::success:
450 {
451 enter_toi_common(lock);
452 ret = 0;
453 break;
454 }
455 case wsrep::provider::error_certification_failed:
456 override_error(e_deadlock_error, status);
457 ret = 1;
458 break;
459 default:
460 if (timed_out) {
461 override_error(e_timeout_error);
462 } else {
463 override_error(e_error_during_commit, status);
464 }
465 ret = 1;
466 break;
467 }
468
469 debug_log_state("enter_toi_local: leave");
470 return ret;
471 }
472
enter_toi_mode(const wsrep::ws_meta & ws_meta)473 void wsrep::client_state::enter_toi_mode(const wsrep::ws_meta& ws_meta)
474 {
475 debug_log_state("enter_toi_mode: enter");
476 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
477 assert(mode_ == m_high_priority);
478 enter_toi_common(lock);
479 toi_meta_ = ws_meta;
480 debug_log_state("enter_toi_mode: leave");
481 }
482
leave_toi_common()483 void wsrep::client_state::leave_toi_common()
484 {
485 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
486 mode(lock, toi_mode_);
487 toi_mode_ = m_undefined;
488 if (toi_meta_.gtid().is_undefined() == false)
489 {
490 update_last_written_gtid(toi_meta_.gtid());
491 }
492 toi_meta_ = wsrep::ws_meta();
493 }
494
leave_toi_local(const wsrep::mutable_buffer & err)495 int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err)
496 {
497 debug_log_state("leave_toi_local: enter");
498 assert(toi_mode_ == m_local);
499 leave_toi_common();
500
501 debug_log_state("leave_toi_local: leave");
502 return (provider().leave_toi(id_, err) == provider::success ? 0 : 1);
503 }
504
leave_toi_mode()505 void wsrep::client_state::leave_toi_mode()
506 {
507 debug_log_state("leave_toi_mode: enter");
508 assert(toi_mode_ == m_high_priority);
509 leave_toi_common();
510 debug_log_state("leave_toi_mode: leave");
511 }
512
513 ///////////////////////////////////////////////////////////////////////////////
514 // RSU //
515 ///////////////////////////////////////////////////////////////////////////////
516
begin_rsu(int timeout)517 int wsrep::client_state::begin_rsu(int timeout)
518 {
519 if (server_state_.desync())
520 {
521 wsrep::log_warning() << "Failed to desync server";
522 return 1;
523 }
524 if (server_state_.server_service().wait_committing_transactions(timeout))
525 {
526 wsrep::log_warning() << "RSU failed due to pending transactions";
527 server_state_.resync();
528 return 1;
529 }
530 wsrep::seqno pause_seqno(server_state_.pause());
531 if (pause_seqno.is_undefined())
532 {
533 wsrep::log_warning() << "Failed to pause provider";
534 server_state_.resync();
535 return 1;
536 }
537 wsrep::log_info() << "Provider paused at: " << pause_seqno;
538 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
539 toi_mode_ = mode_;
540 mode(lock, m_rsu);
541 return 0;
542 }
543
end_rsu()544 int wsrep::client_state::end_rsu()
545 {
546 int ret(0);
547 try
548 {
549 server_state_.resume();
550 server_state_.resync();
551 }
552 catch (const wsrep::runtime_error& e)
553 {
554 wsrep::log_warning() << "End RSU failed: " << e.what();
555 ret = 1;
556 }
557 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
558 mode(lock, toi_mode_);
559 toi_mode_ = m_undefined;
560 return ret;
561 }
562
563 ///////////////////////////////////////////////////////////////////////////////
564 // NBO //
565 ///////////////////////////////////////////////////////////////////////////////
566
begin_nbo_phase_one(const wsrep::key_array & keys,const wsrep::const_buffer & buffer,std::chrono::time_point<wsrep::clock> wait_until)567 int wsrep::client_state::begin_nbo_phase_one(
568 const wsrep::key_array& keys,
569 const wsrep::const_buffer& buffer,
570 std::chrono::time_point<wsrep::clock> wait_until)
571 {
572 debug_log_state("begin_nbo_phase_one: enter");
573 debug_log_keys(keys);
574 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
575 assert(state_ == s_exec);
576 assert(mode_ == m_local);
577 assert(toi_mode_ == m_undefined);
578
579 int ret;
580 bool timed_out;
581 auto const status(poll_enter_toi(
582 lock, keys, buffer,
583 toi_meta_,
584 wsrep::provider::flag::start_transaction,
585 wait_until,
586 timed_out));
587 switch (status)
588 {
589 case wsrep::provider::success:
590 toi_mode_ = mode_;
591 mode(lock, m_nbo);
592 ret= 0;
593 break;
594 case wsrep::provider::error_certification_failed:
595 override_error(e_deadlock_error, status);
596 ret = 1;
597 break;
598 default:
599 if (timed_out) {
600 override_error(e_timeout_error);
601 } else {
602 override_error(e_error_during_commit, status);
603 }
604 ret = 1;
605 break;
606 }
607
608 debug_log_state("begin_nbo_phase_one: leave");
609 return ret;
610 }
611
end_nbo_phase_one(const wsrep::mutable_buffer & err)612 int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err)
613 {
614 debug_log_state("end_nbo_phase_one: enter");
615 assert(state_ == s_exec);
616 assert(mode_ == m_nbo);
617 assert(in_toi());
618
619 enum wsrep::provider::status status(provider().leave_toi(id_, err));
620 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
621 int ret;
622 switch (status)
623 {
624 case wsrep::provider::success:
625 ret = 0;
626 break;
627 default:
628 override_error(e_error_during_commit, status);
629 ret = 1;
630 break;
631 }
632 nbo_meta_ = toi_meta_;
633 toi_meta_ = wsrep::ws_meta();
634 toi_mode_ = m_undefined;
635 debug_log_state("end_nbo_phase_one: leave");
636 return ret;
637 }
638
enter_nbo_mode(const wsrep::ws_meta & ws_meta)639 int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta)
640 {
641 assert(state_ == s_exec);
642 assert(mode_ == m_local);
643 assert(toi_mode_ == m_undefined);
644 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
645 nbo_meta_ = ws_meta;
646 mode(lock, m_nbo);
647 return 0;
648 }
649
begin_nbo_phase_two(const wsrep::key_array & keys,std::chrono::time_point<wsrep::clock> wait_until)650 int wsrep::client_state::begin_nbo_phase_two(
651 const wsrep::key_array& keys,
652 std::chrono::time_point<wsrep::clock> wait_until)
653 {
654 debug_log_state("begin_nbo_phase_two: enter");
655 debug_log_keys(keys);
656 assert(state_ == s_exec);
657 assert(mode_ == m_nbo);
658 assert(toi_mode_ == m_undefined);
659 assert(!in_toi());
660
661 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
662 // Note: nbo_meta_ is passed to enter_toi() as it is
663 // an input param containing gtid of NBO begin.
664 // Output stored in nbo_meta_ is copied to toi_meta_ for
665 // phase two end.
666 bool timed_out;
667 enum wsrep::provider::status status(
668 poll_enter_toi(lock, keys,
669 wsrep::const_buffer(),
670 nbo_meta_,
671 wsrep::provider::flag::commit,
672 wait_until,
673 timed_out));
674 int ret;
675 switch (status)
676 {
677 case wsrep::provider::success:
678 ret= 0;
679 toi_meta_ = nbo_meta_;
680 toi_mode_ = m_local;
681 break;
682 case wsrep::provider::error_provider_failed:
683 override_error(e_interrupted_error, status);
684 ret= 1;
685 break;
686 default:
687 if (timed_out)
688 {
689 override_error(e_timeout_error, status);
690 }
691 else
692 {
693 override_error(e_error_during_commit, status);
694 }
695 ret= 1;
696 break;
697 }
698
699 // Failed to grab TOI for completing NBO in order. This means that
700 // the operation cannot be ended in total order, so we end the
701 // NBO mode and let the DBMS to deal with the error.
702 if (ret)
703 {
704 mode(lock, m_local);
705 nbo_meta_ = wsrep::ws_meta();
706 }
707
708 debug_log_state("begin_nbo_phase_two: leave");
709 return ret;
710 }
711
end_nbo_phase_two(const wsrep::mutable_buffer & err)712 int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err)
713 {
714 debug_log_state("end_nbo_phase_two: enter");
715 assert(state_ == s_exec);
716 assert(mode_ == m_nbo);
717 assert(toi_mode_ == m_local);
718 assert(in_toi());
719 enum wsrep::provider::status status(
720 provider().leave_toi(id_, err));
721 wsrep::unique_lock<wsrep::mutex> lock(mutex_);
722 int ret;
723 switch (status)
724 {
725 case wsrep::provider::success:
726 ret = 0;
727 break;
728 default:
729 override_error(e_error_during_commit, status);
730 ret = 1;
731 break;
732 }
733 toi_meta_ = wsrep::ws_meta();
734 toi_mode_ = m_undefined;
735 nbo_meta_ = wsrep::ws_meta();
736 mode(lock, m_local);
737 debug_log_state("end_nbo_phase_two: leave");
738 return ret;
739 }
740 ///////////////////////////////////////////////////////////////////////////////
741 // Misc //
742 ///////////////////////////////////////////////////////////////////////////////
743
sync_wait(int timeout)744 int wsrep::client_state::sync_wait(int timeout)
745 {
746 std::pair<wsrep::gtid, enum wsrep::provider::status> result(
747 server_state_.causal_read(timeout));
748 int ret(1);
749 switch (result.second)
750 {
751 case wsrep::provider::success:
752 sync_wait_gtid_ = result.first;
753 ret = 0;
754 break;
755 case wsrep::provider::error_not_implemented:
756 override_error(wsrep::e_not_supported_error);
757 break;
758 default:
759 override_error(wsrep::e_timeout_error);
760 break;
761 }
762 return ret;
763 }
764
765 ///////////////////////////////////////////////////////////////////////////////
766 // Private //
767 ///////////////////////////////////////////////////////////////////////////////
768
do_acquire_ownership(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)769 void wsrep::client_state::do_acquire_ownership(
770 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
771 {
772 assert(lock.owns_lock());
773 // Be strict about client state for clients in local mode. The
774 // owning_thread_id_ is used to detect bugs which are caused by
775 // more than one thread operating the client state at the time,
776 // for example thread handling the client session and background
777 // rollbacker.
778 assert(state_ == s_idle || mode_ != m_local);
779 owning_thread_id_ = wsrep::this_thread::get_id();
780 }
781
do_wait_rollback_complete_and_acquire_ownership(wsrep::unique_lock<wsrep::mutex> & lock)782 void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership(
783 wsrep::unique_lock<wsrep::mutex>& lock)
784 {
785 assert(lock.owns_lock());
786 assert(state_ == s_idle);
787 while (is_rollbacker_active())
788 {
789 cond_.wait(lock);
790 }
791 do_acquire_ownership(lock);
792 state(lock, s_exec);
793 }
794
update_last_written_gtid(const wsrep::gtid & gtid)795 void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)
796 {
797 assert(last_written_gtid_.is_undefined() ||
798 (last_written_gtid_.id() == gtid.id() &&
799 !(last_written_gtid_.seqno() > gtid.seqno())));
800 last_written_gtid_ = gtid;
801 }
802
debug_log_state(const char * context) const803 void wsrep::client_state::debug_log_state(const char* context) const
804 {
805 WSREP_LOG_DEBUG(debug_log_level(),
806 wsrep::log::debug_level_client_state,
807 context
808 << "(" << id_.get()
809 << "," << to_c_string(state_)
810 << "," << to_c_string(mode_)
811 << "," << wsrep::to_string(current_error_)
812 << "," << current_error_status_
813 << ",toi: " << toi_meta_.seqno()
814 << ",nbo: " << nbo_meta_.seqno() << ")");
815 }
816
debug_log_keys(const wsrep::key_array & keys) const817 void wsrep::client_state::debug_log_keys(const wsrep::key_array& keys) const
818 {
819 for (size_t i(0); i < keys.size(); ++i)
820 {
821 WSREP_LOG_DEBUG(debug_log_level(),
822 wsrep::log::debug_level_client_state,
823 "TOI keys: "
824 << " id: " << id_
825 << "key: " << keys[i]);
826 }
827 }
828
state(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,enum wsrep::client_state::state state)829 void wsrep::client_state::state(
830 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
831 enum wsrep::client_state::state state)
832 {
833 // Verify that the current thread has gained control to the
834 // connection by calling before_command()
835 assert(wsrep::this_thread::get_id() == owning_thread_id_);
836 assert(lock.owns_lock());
837 static const char allowed[state_max_][state_max_] =
838 {
839 /* none idle exec result quit */
840 { 0, 1, 0, 0, 0}, /* none */
841 { 0, 0, 1, 0, 1}, /* idle */
842 { 0, 0, 0, 1, 0}, /* exec */
843 { 0, 1, 0, 0, 0}, /* result */
844 { 1, 0, 0, 0, 0} /* quit */
845 };
846 if (!allowed[state_][state])
847 {
848 wsrep::log_warning() << "client_state: Unallowed state transition: "
849 << state_ << " -> " << state;
850 assert(0);
851 }
852 state_hist_.push_back(state_);
853 state_ = state;
854 if (state_hist_.size() > 10)
855 {
856 state_hist_.erase(state_hist_.begin());
857 }
858
859 }
860
mode(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,enum mode mode)861 void wsrep::client_state::mode(
862 wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
863 enum mode mode)
864 {
865 assert(lock.owns_lock());
866
867 static const char allowed[n_modes_][n_modes_] =
868 { /* u l h t r n */
869 { 0, 0, 0, 0, 0, 0 }, /* undefined */
870 { 0, 0, 1, 1, 1, 1 }, /* local */
871 { 0, 1, 0, 1, 0, 1 }, /* high prio */
872 { 0, 1, 1, 0, 0, 0 }, /* toi */
873 { 0, 1, 0, 0, 0, 0 }, /* rsu */
874 { 0, 1, 1, 0, 0, 0 } /* nbo */
875 };
876 if (!allowed[mode_][mode])
877 {
878 wsrep::log_warning() << "client_state: Unallowed mode transition: "
879 << mode_ << " -> " << mode;
880 assert(0);
881 }
882 mode_ = mode;
883 }
884