1 /*
2  * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "wsrep/client_state.hpp"
21 #include "wsrep/compiler.hpp"
22 #include "wsrep/logger.hpp"
23 
24 #include <unistd.h> // usleep()
25 #include <sstream>
26 #include <iostream>
27 
provider() const28 wsrep::provider& wsrep::client_state::provider() const
29 {
30     return server_state_.provider();
31 }
32 
open(wsrep::client_id id)33 void wsrep::client_state::open(wsrep::client_id id)
34 {
35     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
36     assert(state_ == s_none);
37     assert(keep_command_error_ == false);
38     debug_log_state("open: enter");
39     owning_thread_id_ = wsrep::this_thread::get_id();
40     rollbacker_active_ = false;
41     sync_wait_gtid_ = wsrep::gtid::undefined();
42     last_written_gtid_ = wsrep::gtid::undefined();
43     state(lock, s_idle);
44     id_ = id;
45     debug_log_state("open: leave");
46 }
47 
close()48 void wsrep::client_state::close()
49 {
50     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
51     debug_log_state("close: enter");
52 
53     while (mode_ == m_local && is_rollbacker_active()) {
54         cond_.wait(lock);
55     }
56     do_acquire_ownership(lock);
57 
58     state(lock, s_quitting);
59     keep_command_error_ = false;
60     lock.unlock();
61     if (transaction_.active() &&
62         (mode_ != m_local ||
63          transaction_.state() != wsrep::transaction::s_prepared))
64     {
65         client_service_.bf_rollback();
66         transaction_.after_statement();
67     }
68     if (mode_ == m_local)
69     {
70         disable_streaming();
71     }
72     debug_log_state("close: leave");
73 }
74 
cleanup()75 void wsrep::client_state::cleanup()
76 {
77     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
78     cleanup(lock);
79 }
80 
cleanup(wsrep::unique_lock<wsrep::mutex> & lock)81 void wsrep::client_state::cleanup(wsrep::unique_lock<wsrep::mutex>& lock)
82 {
83     debug_log_state("cleanup: enter");
84     state(lock, s_none);
85     debug_log_state("cleanup: leave");
86 }
87 
override_error(enum wsrep::client_error error,enum wsrep::provider::status status)88 void wsrep::client_state::override_error(enum wsrep::client_error error,
89                                          enum wsrep::provider::status status)
90 {
91     assert(wsrep::this_thread::get_id() == owning_thread_id_);
92     // Error state should not be cleared with success code without
93     // explicit reset_error() call.
94     assert(current_error_ == wsrep::e_success ||
95            error != wsrep::e_success);
96     current_error_ = error;
97     current_error_status_ = status;
98 }
99 
before_command(bool keep_command_error)100 int wsrep::client_state::before_command(bool keep_command_error)
101 {
102     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
103     debug_log_state("before_command: enter");
104     // If the state is s_exec, the processing thread has already grabbed
105     // control with wait_rollback_complete_and_acquire_ownership()
106     if (state_ != s_exec)
107     {
108         assert(state_ == s_idle);
109         do_wait_rollback_complete_and_acquire_ownership(lock);
110         assert(state_ == s_exec);
111         client_service_.store_globals();
112     }
113     else
114     {
115         // This thread must have acquired control by other means,
116         // for example via wait_rollback_complete_and_acquire_ownership().
117         assert(wsrep::this_thread::get_id() == owning_thread_id_);
118     }
119 
120     keep_command_error_ = keep_command_error;
121 
122     // If the transaction is active, it must be either executing,
123     // aborted as rolled back by rollbacker, or must_abort if the
124     // client thread gained control via
125     // wait_rollback_complete_and_acquire_ownership()
126     // just before BF abort happened.
127     assert(transaction_.active() == false ||
128            (transaction_.state() == wsrep::transaction::s_executing ||
129             transaction_.state() == wsrep::transaction::s_prepared ||
130             transaction_.state() == wsrep::transaction::s_aborted ||
131             transaction_.state() == wsrep::transaction::s_must_abort));
132 
133     if (transaction_.active())
134     {
135         if (transaction_.state() == wsrep::transaction::s_must_abort ||
136             transaction_.state() == wsrep::transaction::s_aborted)
137         {
138             if (transaction_.is_xa())
139             {
140                 // Client will rollback explicitly, return error.
141                 debug_log_state("before_command: error");
142                 return 1;
143             }
144 
145             override_error(wsrep::e_deadlock_error);
146             if (transaction_.state() == wsrep::transaction::s_must_abort)
147             {
148                 lock.unlock();
149                 client_service_.bf_rollback();
150                 lock.lock();
151 
152             }
153 
154             if (keep_command_error_)
155             {
156                 // Keep the error for the next command
157                 debug_log_state("before_command: keep error");
158                 return 0;
159             }
160 
161             // Clean up the transaction and return error.
162             lock.unlock();
163             (void)transaction_.after_statement();
164             lock.lock();
165 
166             assert(transaction_.active() == false);
167             assert(transaction_.state() == wsrep::transaction::s_aborted);
168             assert(current_error() != wsrep::e_success);
169 
170             debug_log_state("before_command: error");
171             return 1;
172         }
173     }
174     debug_log_state("before_command: success");
175     return 0;
176 }
177 
after_command_before_result()178 void wsrep::client_state::after_command_before_result()
179 {
180     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
181     debug_log_state("after_command_before_result: enter");
182     assert(state() == s_exec);
183     if (transaction_.active() &&
184         transaction_.state() == wsrep::transaction::s_must_abort)
185     {
186         transaction_.after_command_must_abort(lock);
187         // If keep current error is set, the result will be propagated
188         // back to client with some future command, so keep the transaction
189         // open here so that error handling can happen in before_command()
190         // hook.
191         if (not keep_command_error_)
192         {
193             lock.unlock();
194             (void)transaction_.after_statement();
195             lock.lock();
196         }
197 
198         assert(transaction_.state() == wsrep::transaction::s_aborted);
199     }
200     state(lock, s_result);
201     debug_log_state("after_command_before_result: leave");
202 }
203 
after_command_after_result()204 void wsrep::client_state::after_command_after_result()
205 {
206     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
207     debug_log_state("after_command_after_result_enter");
208     assert(state() == s_result);
209     assert(transaction_.state() != wsrep::transaction::s_aborting);
210     if (transaction_.active() &&
211         transaction_.state() == wsrep::transaction::s_must_abort)
212     {
213         transaction_.after_command_must_abort(lock);
214         assert(transaction_.state() == wsrep::transaction::s_aborted);
215     }
216     else if (transaction_.active() == false && not keep_command_error_)
217     {
218         current_error_ = wsrep::e_success;
219         current_error_status_ = wsrep::provider::success;
220     }
221     keep_command_error_ = false;
222     sync_wait_gtid_ = wsrep::gtid::undefined();
223     state(lock, s_idle);
224     debug_log_state("after_command_after_result: leave");
225 }
226 
before_statement()227 int wsrep::client_state::before_statement()
228 {
229     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
230     debug_log_state("before_statement: enter");
231 #if 0
232     /**
233      * @todo It might be beneficial to implement timed wait for
234      *       server synced state.
235      */
236     if (allow_dirty_reads_ == false &&
237         server_state_.state() != wsrep::server_state::s_synced)
238     {
239         return 1;
240     }
241 #endif // 0
242 
243     if (transaction_.active() &&
244         transaction_.state() == wsrep::transaction::s_must_abort)
245     {
246         // Rollback and cleanup will happen in after_command_before_result()
247         debug_log_state("before_statement_error");
248         return 1;
249     }
250     debug_log_state("before_statement: success");
251     return 0;
252 }
253 
after_statement()254 int wsrep::client_state::after_statement()
255 {
256     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
257     debug_log_state("after_statement: enter");
258     assert(state() == s_exec);
259     assert(mode() == m_local);
260 
261     if (transaction_.active() &&
262         transaction_.state() == wsrep::transaction::s_must_abort)
263     {
264         lock.unlock();
265         client_service_.bf_rollback();
266         lock.lock();
267         assert(transaction_.state() == wsrep::transaction::s_aborted);
268         // Error may be set already. For example, if fragment size
269         // exceeded the maximum size in certify_fragment(), then
270         // we already have wsrep::e_error_during_commit
271         if (current_error() == wsrep::e_success)
272         {
273             override_error(wsrep::e_deadlock_error);
274         }
275     }
276     lock.unlock();
277 
278     (void)transaction_.after_statement();
279     if (current_error() == wsrep::e_deadlock_error)
280     {
281         if (mode_ == m_local)
282         {
283             debug_log_state("after_statement: may_retry");
284             return 1;
285         }
286         else
287         {
288             debug_log_state("after_statement: error");
289             return 1;
290         }
291     }
292     debug_log_state("after_statement: success");
293     return 0;
294 }
295 
296 //////////////////////////////////////////////////////////////////////////////
297 //                      Rollbacker synchronization                          //
298 //////////////////////////////////////////////////////////////////////////////
299 
sync_rollback_complete()300 void wsrep::client_state::sync_rollback_complete()
301 {
302     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
303     debug_log_state("sync_rollback_complete: enter");
304     assert(state_ == s_idle && mode_ == m_local &&
305            transaction_.state() == wsrep::transaction::s_aborted);
306     set_rollbacker_active(false);
307     cond_.notify_all();
308     debug_log_state("sync_rollback_complete: leave");
309 }
310 
wait_rollback_complete_and_acquire_ownership()311 void wsrep::client_state::wait_rollback_complete_and_acquire_ownership()
312 {
313     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
314     debug_log_state("wait_rollback_complete_and_acquire_ownership: enter");
315     if (state_ == s_idle)
316     {
317         do_wait_rollback_complete_and_acquire_ownership(lock);
318     }
319     assert(state_ == s_exec);
320     debug_log_state("wait_rollback_complete_and_acquire_ownership: leave");
321 }
322 
323 //////////////////////////////////////////////////////////////////////////////
324 //                             Streaming                                    //
325 //////////////////////////////////////////////////////////////////////////////
326 
streaming_params(enum wsrep::streaming_context::fragment_unit fragment_unit,size_t fragment_size)327 void wsrep::client_state::streaming_params(
328     enum wsrep::streaming_context::fragment_unit fragment_unit,
329     size_t fragment_size)
330 {
331     assert(mode_ == m_local);
332     transaction_.streaming_context().params(fragment_unit, fragment_size);
333 }
334 
enable_streaming(enum wsrep::streaming_context::fragment_unit fragment_unit,size_t fragment_size)335 int wsrep::client_state::enable_streaming(
336     enum wsrep::streaming_context::fragment_unit
337     fragment_unit,
338     size_t fragment_size)
339 {
340     assert(mode_ == m_local);
341     if (transaction_.is_streaming() &&
342         transaction_.streaming_context().fragment_unit() !=
343         fragment_unit)
344     {
345         wsrep::log_error()
346             << "Changing fragment unit for active streaming transaction "
347             << "not allowed";
348         return 1;
349     }
350     transaction_.streaming_context().enable(fragment_unit, fragment_size);
351     return 0;
352 }
353 
disable_streaming()354 void wsrep::client_state::disable_streaming()
355 {
356     assert(mode_ == m_local);
357     assert(state_ == s_exec || state_ == s_quitting);
358     transaction_.streaming_context().disable();
359 }
360 
361 //////////////////////////////////////////////////////////////////////////////
362 //                                 TOI                                      //
363 //////////////////////////////////////////////////////////////////////////////
364 
365 enum wsrep::provider::status
poll_enter_toi(wsrep::unique_lock<wsrep::mutex> & lock,const wsrep::key_array & keys,const wsrep::const_buffer & buffer,wsrep::ws_meta & meta,int flags,std::chrono::time_point<wsrep::clock> wait_until,bool & timed_out)366 wsrep::client_state::poll_enter_toi(
367     wsrep::unique_lock<wsrep::mutex>& lock,
368     const wsrep::key_array& keys,
369     const wsrep::const_buffer& buffer,
370     wsrep::ws_meta& meta,
371     int flags,
372     std::chrono::time_point<wsrep::clock> wait_until,
373     bool& timed_out)
374 {
375     WSREP_LOG_DEBUG(debug_log_level(),
376                     wsrep::log::debug_level_client_state,
377                     "poll_enter_toi: "
378                     << flags
379                     << ","
380                     << wait_until.time_since_epoch().count());
381     enum wsrep::provider::status status;
382     timed_out = false;
383     wsrep::ws_meta poll_meta; // tmp var for polling, as enter_toi may clear meta arg on errors
384     do
385     {
386         lock.unlock();
387         poll_meta = meta;
388         status = provider().enter_toi(id_, keys, buffer, poll_meta, flags);
389         if (status != wsrep::provider::success &&
390             not poll_meta.gtid().is_undefined())
391         {
392             // Successfully entered TOI, but the provider reported failure.
393             // This may happen for example if certification fails.
394             // Leave TOI before proceeding.
395             if (provider().leave_toi(id_, wsrep::mutable_buffer()))
396             {
397                 wsrep::log_warning()
398                     << "Failed to leave TOI after failure in "
399                     << "poll_enter_toi()";
400             }
401             poll_meta = wsrep::ws_meta();
402         }
403         if (status == wsrep::provider::error_certification_failed ||
404             status == wsrep::provider::error_connection_failed)
405         {
406             ::usleep(300000);
407         }
408         lock.lock();
409         timed_out = !(wait_until.time_since_epoch().count() &&
410                       wsrep::clock::now() < wait_until);
411     }
412     while ((status == wsrep::provider::error_certification_failed ||
413             status == wsrep::provider::error_connection_failed) &&
414            not timed_out &&
415            not client_service_.interrupted(lock));
416     meta = poll_meta;
417     return status;
418 }
419 
enter_toi_common(wsrep::unique_lock<wsrep::mutex> & lock)420 void wsrep::client_state::enter_toi_common(
421     wsrep::unique_lock<wsrep::mutex>& lock)
422 {
423     assert(lock.owns_lock());
424     toi_mode_ = mode_;
425     mode(lock, m_toi);
426 }
427 
enter_toi_local(const wsrep::key_array & keys,const wsrep::const_buffer & buffer,std::chrono::time_point<wsrep::clock> wait_until)428 int wsrep::client_state::enter_toi_local(const wsrep::key_array& keys,
429                                          const wsrep::const_buffer& buffer,
430                                          std::chrono::time_point<wsrep::clock> wait_until)
431 {
432     debug_log_state("enter_toi_local: enter");
433     assert(state_ == s_exec);
434     assert(mode_ == m_local);
435     int ret;
436 
437     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
438 
439     bool timed_out;
440     auto const status(poll_enter_toi(
441                           lock, keys, buffer,
442                           toi_meta_,
443                           wsrep::provider::flag::start_transaction |
444                           wsrep::provider::flag::commit,
445                           wait_until,
446                           timed_out));
447     switch (status)
448     {
449     case wsrep::provider::success:
450     {
451         enter_toi_common(lock);
452         ret = 0;
453         break;
454     }
455     case wsrep::provider::error_certification_failed:
456         override_error(e_deadlock_error, status);
457         ret = 1;
458         break;
459     default:
460         if (timed_out) {
461             override_error(e_timeout_error);
462         } else {
463             override_error(e_error_during_commit, status);
464         }
465         ret = 1;
466         break;
467     }
468 
469     debug_log_state("enter_toi_local: leave");
470     return ret;
471 }
472 
enter_toi_mode(const wsrep::ws_meta & ws_meta)473 void wsrep::client_state::enter_toi_mode(const wsrep::ws_meta& ws_meta)
474 {
475     debug_log_state("enter_toi_mode: enter");
476     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
477     assert(mode_ == m_high_priority);
478     enter_toi_common(lock);
479     toi_meta_ = ws_meta;
480     debug_log_state("enter_toi_mode: leave");
481 }
482 
leave_toi_common()483 void wsrep::client_state::leave_toi_common()
484 {
485     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
486     mode(lock, toi_mode_);
487     toi_mode_ = m_undefined;
488     if (toi_meta_.gtid().is_undefined() == false)
489     {
490         update_last_written_gtid(toi_meta_.gtid());
491     }
492     toi_meta_ = wsrep::ws_meta();
493 }
494 
leave_toi_local(const wsrep::mutable_buffer & err)495 int wsrep::client_state::leave_toi_local(const wsrep::mutable_buffer& err)
496 {
497     debug_log_state("leave_toi_local: enter");
498     assert(toi_mode_ == m_local);
499     leave_toi_common();
500 
501     debug_log_state("leave_toi_local: leave");
502     return (provider().leave_toi(id_, err) == provider::success ? 0 : 1);
503 }
504 
leave_toi_mode()505 void wsrep::client_state::leave_toi_mode()
506 {
507     debug_log_state("leave_toi_mode: enter");
508     assert(toi_mode_ == m_high_priority);
509     leave_toi_common();
510     debug_log_state("leave_toi_mode: leave");
511 }
512 
513 ///////////////////////////////////////////////////////////////////////////////
514 //                                 RSU                                       //
515 ///////////////////////////////////////////////////////////////////////////////
516 
begin_rsu(int timeout)517 int wsrep::client_state::begin_rsu(int timeout)
518 {
519     if (server_state_.desync())
520     {
521         wsrep::log_warning() << "Failed to desync server";
522         return 1;
523     }
524     if (server_state_.server_service().wait_committing_transactions(timeout))
525     {
526         wsrep::log_warning() << "RSU failed due to pending transactions";
527         server_state_.resync();
528         return 1;
529     }
530     wsrep::seqno pause_seqno(server_state_.pause());
531     if (pause_seqno.is_undefined())
532     {
533         wsrep::log_warning() << "Failed to pause provider";
534         server_state_.resync();
535         return 1;
536     }
537     wsrep::log_info() << "Provider paused at: " << pause_seqno;
538     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
539     toi_mode_ = mode_;
540     mode(lock, m_rsu);
541     return 0;
542 }
543 
end_rsu()544 int wsrep::client_state::end_rsu()
545 {
546     int ret(0);
547     try
548     {
549         server_state_.resume();
550         server_state_.resync();
551     }
552     catch (const wsrep::runtime_error& e)
553     {
554         wsrep::log_warning() << "End RSU failed: " << e.what();
555         ret = 1;
556     }
557     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
558     mode(lock, toi_mode_);
559     toi_mode_ = m_undefined;
560     return ret;
561 }
562 
563 ///////////////////////////////////////////////////////////////////////////////
564 //                                 NBO                                       //
565 ///////////////////////////////////////////////////////////////////////////////
566 
begin_nbo_phase_one(const wsrep::key_array & keys,const wsrep::const_buffer & buffer,std::chrono::time_point<wsrep::clock> wait_until)567 int wsrep::client_state::begin_nbo_phase_one(
568     const wsrep::key_array& keys,
569     const wsrep::const_buffer& buffer,
570     std::chrono::time_point<wsrep::clock> wait_until)
571 {
572     debug_log_state("begin_nbo_phase_one: enter");
573     debug_log_keys(keys);
574     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
575     assert(state_ == s_exec);
576     assert(mode_ == m_local);
577     assert(toi_mode_ == m_undefined);
578 
579     int ret;
580     bool timed_out;
581     auto const status(poll_enter_toi(
582                           lock, keys, buffer,
583                           toi_meta_,
584                           wsrep::provider::flag::start_transaction,
585                           wait_until,
586                           timed_out));
587     switch (status)
588     {
589     case wsrep::provider::success:
590         toi_mode_ = mode_;
591         mode(lock, m_nbo);
592         ret= 0;
593         break;
594     case wsrep::provider::error_certification_failed:
595         override_error(e_deadlock_error, status);
596         ret = 1;
597         break;
598     default:
599         if (timed_out) {
600             override_error(e_timeout_error);
601         } else {
602             override_error(e_error_during_commit, status);
603         }
604         ret = 1;
605         break;
606     }
607 
608     debug_log_state("begin_nbo_phase_one: leave");
609     return ret;
610 }
611 
end_nbo_phase_one(const wsrep::mutable_buffer & err)612 int wsrep::client_state::end_nbo_phase_one(const wsrep::mutable_buffer& err)
613 {
614     debug_log_state("end_nbo_phase_one: enter");
615     assert(state_ == s_exec);
616     assert(mode_ == m_nbo);
617     assert(in_toi());
618 
619     enum wsrep::provider::status status(provider().leave_toi(id_, err));
620     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
621     int ret;
622     switch (status)
623     {
624     case wsrep::provider::success:
625         ret = 0;
626         break;
627     default:
628         override_error(e_error_during_commit, status);
629         ret = 1;
630         break;
631     }
632     nbo_meta_ = toi_meta_;
633     toi_meta_ = wsrep::ws_meta();
634     toi_mode_ = m_undefined;
635     debug_log_state("end_nbo_phase_one: leave");
636     return ret;
637 }
638 
enter_nbo_mode(const wsrep::ws_meta & ws_meta)639 int wsrep::client_state::enter_nbo_mode(const wsrep::ws_meta& ws_meta)
640 {
641     assert(state_ == s_exec);
642     assert(mode_ == m_local);
643     assert(toi_mode_ == m_undefined);
644     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
645     nbo_meta_ = ws_meta;
646     mode(lock, m_nbo);
647     return 0;
648 }
649 
begin_nbo_phase_two(const wsrep::key_array & keys,std::chrono::time_point<wsrep::clock> wait_until)650 int wsrep::client_state::begin_nbo_phase_two(
651     const wsrep::key_array& keys,
652     std::chrono::time_point<wsrep::clock> wait_until)
653 {
654     debug_log_state("begin_nbo_phase_two: enter");
655     debug_log_keys(keys);
656     assert(state_ == s_exec);
657     assert(mode_ == m_nbo);
658     assert(toi_mode_ == m_undefined);
659     assert(!in_toi());
660 
661     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
662     // Note: nbo_meta_ is passed to enter_toi() as it is
663     // an input param containing gtid of NBO begin.
664     // Output stored in nbo_meta_ is copied to toi_meta_ for
665     // phase two end.
666     bool timed_out;
667     enum wsrep::provider::status status(
668         poll_enter_toi(lock, keys,
669                        wsrep::const_buffer(),
670                        nbo_meta_,
671                        wsrep::provider::flag::commit,
672                        wait_until,
673                        timed_out));
674     int ret;
675     switch (status)
676     {
677     case wsrep::provider::success:
678         ret= 0;
679         toi_meta_ = nbo_meta_;
680         toi_mode_ = m_local;
681         break;
682     case wsrep::provider::error_provider_failed:
683         override_error(e_interrupted_error, status);
684         ret= 1;
685         break;
686     default:
687         if (timed_out)
688         {
689             override_error(e_timeout_error, status);
690         }
691         else
692         {
693             override_error(e_error_during_commit, status);
694         }
695         ret= 1;
696         break;
697     }
698 
699     // Failed to grab TOI for completing NBO in order. This means that
700     // the operation cannot be ended in total order, so we end the
701     // NBO mode and let the DBMS to deal with the error.
702     if (ret)
703     {
704         mode(lock, m_local);
705         nbo_meta_ = wsrep::ws_meta();
706     }
707 
708     debug_log_state("begin_nbo_phase_two: leave");
709     return ret;
710 }
711 
end_nbo_phase_two(const wsrep::mutable_buffer & err)712 int wsrep::client_state::end_nbo_phase_two(const wsrep::mutable_buffer& err)
713 {
714     debug_log_state("end_nbo_phase_two: enter");
715     assert(state_ == s_exec);
716     assert(mode_ == m_nbo);
717     assert(toi_mode_ == m_local);
718     assert(in_toi());
719     enum wsrep::provider::status status(
720         provider().leave_toi(id_, err));
721     wsrep::unique_lock<wsrep::mutex> lock(mutex_);
722     int ret;
723     switch (status)
724     {
725     case wsrep::provider::success:
726         ret = 0;
727         break;
728     default:
729         override_error(e_error_during_commit, status);
730         ret = 1;
731         break;
732     }
733     toi_meta_ = wsrep::ws_meta();
734     toi_mode_ = m_undefined;
735     nbo_meta_ = wsrep::ws_meta();
736     mode(lock, m_local);
737     debug_log_state("end_nbo_phase_two: leave");
738     return ret;
739 }
740 ///////////////////////////////////////////////////////////////////////////////
741 //                                 Misc                                      //
742 ///////////////////////////////////////////////////////////////////////////////
743 
sync_wait(int timeout)744 int wsrep::client_state::sync_wait(int timeout)
745 {
746     std::pair<wsrep::gtid, enum wsrep::provider::status> result(
747         server_state_.causal_read(timeout));
748     int ret(1);
749     switch (result.second)
750     {
751     case wsrep::provider::success:
752         sync_wait_gtid_ = result.first;
753         ret = 0;
754         break;
755     case wsrep::provider::error_not_implemented:
756         override_error(wsrep::e_not_supported_error);
757         break;
758     default:
759         override_error(wsrep::e_timeout_error);
760         break;
761     }
762     return ret;
763 }
764 
765 ///////////////////////////////////////////////////////////////////////////////
766 //                               Private                                     //
767 ///////////////////////////////////////////////////////////////////////////////
768 
do_acquire_ownership(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED)769 void wsrep::client_state::do_acquire_ownership(
770     wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED)
771 {
772     assert(lock.owns_lock());
773     // Be strict about client state for clients in local mode. The
774     // owning_thread_id_ is used to detect bugs which are caused by
775     // more than one thread operating the client state at the time,
776     // for example thread handling the client session and background
777     // rollbacker.
778     assert(state_ == s_idle || mode_ != m_local);
779     owning_thread_id_ = wsrep::this_thread::get_id();
780 }
781 
do_wait_rollback_complete_and_acquire_ownership(wsrep::unique_lock<wsrep::mutex> & lock)782 void wsrep::client_state::do_wait_rollback_complete_and_acquire_ownership(
783     wsrep::unique_lock<wsrep::mutex>& lock)
784 {
785     assert(lock.owns_lock());
786     assert(state_ == s_idle);
787     while (is_rollbacker_active())
788     {
789         cond_.wait(lock);
790     }
791     do_acquire_ownership(lock);
792     state(lock, s_exec);
793 }
794 
update_last_written_gtid(const wsrep::gtid & gtid)795 void wsrep::client_state::update_last_written_gtid(const wsrep::gtid& gtid)
796 {
797     assert(last_written_gtid_.is_undefined() ||
798            (last_written_gtid_.id() == gtid.id() &&
799             !(last_written_gtid_.seqno() > gtid.seqno())));
800     last_written_gtid_ = gtid;
801 }
802 
debug_log_state(const char * context) const803 void wsrep::client_state::debug_log_state(const char* context) const
804 {
805     WSREP_LOG_DEBUG(debug_log_level(),
806                     wsrep::log::debug_level_client_state,
807                     context
808                     << "(" << id_.get()
809                     << "," << to_c_string(state_)
810                     << "," << to_c_string(mode_)
811                     << "," << wsrep::to_string(current_error_)
812                     << "," << current_error_status_
813                     << ",toi: " << toi_meta_.seqno()
814                     << ",nbo: " << nbo_meta_.seqno() << ")");
815 }
816 
debug_log_keys(const wsrep::key_array & keys) const817 void wsrep::client_state::debug_log_keys(const wsrep::key_array& keys) const
818 {
819     for (size_t i(0); i < keys.size(); ++i)
820     {
821         WSREP_LOG_DEBUG(debug_log_level(),
822                         wsrep::log::debug_level_client_state,
823                         "TOI keys: "
824                         << " id: " << id_
825                         << "key: " << keys[i]);
826     }
827 }
828 
state(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,enum wsrep::client_state::state state)829 void wsrep::client_state::state(
830     wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
831     enum wsrep::client_state::state state)
832 {
833     // Verify that the current thread has gained control to the
834     // connection by calling before_command()
835     assert(wsrep::this_thread::get_id() == owning_thread_id_);
836     assert(lock.owns_lock());
837     static const char allowed[state_max_][state_max_] =
838         {
839             /* none idle exec result quit */
840             {  0,   1,   0,   0,     0}, /* none */
841             {  0,   0,   1,   0,     1}, /* idle */
842             {  0,   0,   0,   1,     0}, /* exec */
843             {  0,   1,   0,   0,     0}, /* result */
844             {  1,   0,   0,   0,     0}  /* quit */
845         };
846     if (!allowed[state_][state])
847     {
848         wsrep::log_warning() << "client_state: Unallowed state transition: "
849                              << state_ << " -> " << state;
850         assert(0);
851     }
852     state_hist_.push_back(state_);
853     state_ = state;
854     if (state_hist_.size() > 10)
855     {
856         state_hist_.erase(state_hist_.begin());
857     }
858 
859 }
860 
mode(wsrep::unique_lock<wsrep::mutex> & lock WSREP_UNUSED,enum mode mode)861 void wsrep::client_state::mode(
862     wsrep::unique_lock<wsrep::mutex>& lock WSREP_UNUSED,
863     enum mode mode)
864 {
865     assert(lock.owns_lock());
866 
867     static const char allowed[n_modes_][n_modes_] =
868         {   /* u  l  h  t  r  n */
869             {  0, 0, 0, 0, 0, 0 }, /* undefined */
870             {  0, 0, 1, 1, 1, 1 }, /* local */
871             {  0, 1, 0, 1, 0, 1 }, /* high prio */
872             {  0, 1, 1, 0, 0, 0 }, /* toi */
873             {  0, 1, 0, 0, 0, 0 }, /* rsu */
874             {  0, 1, 1, 0, 0, 0 }  /* nbo */
875         };
876     if (!allowed[mode_][mode])
877     {
878         wsrep::log_warning() << "client_state: Unallowed mode transition: "
879                              << mode_ << " -> " << mode;
880         assert(0);
881     }
882     mode_ = mode;
883 }
884