1 /*
2  * Copyright (C) 2018-2019 Codership Oy <info@codership.com>
3  *
4  * This file is part of wsrep-lib.
5  *
6  * Wsrep-lib is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * Wsrep-lib is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with wsrep-lib.  If not, see <https://www.gnu.org/licenses/>.
18  */
19 
20 #include "db_high_priority_service.hpp"
21 #include "db_server.hpp"
22 #include "db_client.hpp"
23 
high_priority_service(db::server & server,db::client & client)24 db::high_priority_service::high_priority_service(
25     db::server& server, db::client& client)
26     : wsrep::high_priority_service(server.server_state())
27     , server_(server)
28     , client_(client)
29 { }
30 
start_transaction(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)31 int db::high_priority_service::start_transaction(
32     const wsrep::ws_handle& ws_handle,
33     const wsrep::ws_meta& ws_meta)
34 {
35     return client_.client_state().start_transaction(ws_handle, ws_meta);
36 }
37 
next_fragment(const wsrep::ws_meta & ws_meta)38 int db::high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta)
39 {
40     return client_.client_state().next_fragment(ws_meta);
41 }
42 
transaction() const43 const wsrep::transaction& db::high_priority_service::transaction() const
44 {
45     return client_.client_state().transaction();
46 }
47 
adopt_transaction(const wsrep::transaction &)48 int db::high_priority_service::adopt_transaction(const wsrep::transaction&)
49 {
50     throw wsrep::not_implemented_error();
51 }
52 
apply_write_set(const wsrep::ws_meta &,const wsrep::const_buffer &,wsrep::mutable_buffer &)53 int db::high_priority_service::apply_write_set(
54     const wsrep::ws_meta&,
55     const wsrep::const_buffer&,
56     wsrep::mutable_buffer&)
57 {
58     client_.se_trx_.start(&client_);
59     client_.se_trx_.apply(client_.client_state().transaction());
60     return 0;
61 }
62 
apply_toi(const wsrep::ws_meta &,const wsrep::const_buffer &,wsrep::mutable_buffer &)63 int db::high_priority_service::apply_toi(
64     const wsrep::ws_meta&,
65     const wsrep::const_buffer&,
66     wsrep::mutable_buffer&)
67 {
68     throw wsrep::not_implemented_error();
69 }
70 
apply_nbo_begin(const wsrep::ws_meta &,const wsrep::const_buffer &,wsrep::mutable_buffer &)71 int db::high_priority_service::apply_nbo_begin(
72     const wsrep::ws_meta&,
73     const wsrep::const_buffer&,
74     wsrep::mutable_buffer&)
75 {
76     throw wsrep::not_implemented_error();
77 }
78 
commit(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)79 int db::high_priority_service::commit(const wsrep::ws_handle& ws_handle,
80                                       const wsrep::ws_meta& ws_meta)
81 {
82     client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
83     int ret(client_.client_state_.before_commit());
84     if (ret == 0) client_.se_trx_.commit(ws_meta.gtid());
85     ret = ret || client_.client_state_.ordered_commit();
86     ret = ret || client_.client_state_.after_commit();
87     return ret;
88 }
89 
rollback(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta)90 int db::high_priority_service::rollback(const wsrep::ws_handle& ws_handle,
91                                         const wsrep::ws_meta& ws_meta)
92 {
93     client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, false);
94     int ret(client_.client_state_.before_rollback());
95     assert(ret == 0);
96     client_.se_trx_.rollback();
97     ret = client_.client_state_.after_rollback();
98     assert(ret == 0);
99     return ret;
100 }
101 
adopt_apply_error(wsrep::mutable_buffer & err)102 void db::high_priority_service::adopt_apply_error(wsrep::mutable_buffer& err)
103 {
104     client_.client_state_.adopt_apply_error(err);
105 }
106 
after_apply()107 void db::high_priority_service::after_apply()
108 {
109     client_.client_state_.after_applying();
110 }
111 
log_dummy_write_set(const wsrep::ws_handle & ws_handle,const wsrep::ws_meta & ws_meta,wsrep::mutable_buffer & err)112 int db::high_priority_service::log_dummy_write_set(
113     const wsrep::ws_handle& ws_handle,
114     const wsrep::ws_meta& ws_meta,
115     wsrep::mutable_buffer& err)
116 {
117     int ret(client_.client_state_.start_transaction(ws_handle, ws_meta));
118     assert(ret == 0);
119     if (ws_meta.ordered())
120     {
121         client_.client_state_.adopt_apply_error(err);
122         client_.client_state_.prepare_for_ordering(ws_handle, ws_meta, true);
123         ret = client_.client_state_.before_commit();
124         assert(ret == 0);
125         ret = client_.client_state_.ordered_commit();
126         assert(ret == 0);
127         ret = client_.client_state_.after_commit();
128         assert(ret == 0);
129     }
130     client_.client_state_.after_applying();
131     return ret;
132 }
133 
is_replaying() const134 bool db::high_priority_service::is_replaying() const
135 {
136     return false;
137 }
138