1 /* Copyright (c) 2019-2020, Codership Oy. All rights reserved.
2 *
3 * This program is free software; you can redistribute it and/or modify
4 * it under the terms of the GNU General Public License as published by
5 * the Free Software Foundation; version 2 of the License.
6 *
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
11 *
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
15 */
16
17 #include "trx.h"
18 #include "log.h"
19
20 #include <assert.h>
21 #include <errno.h> // ENOMEM, etc.
22 #include <stdbool.h>
23
24 wsrep_status_t
node_trx_execute(node_store_t * const store,wsrep_t * const wsrep,wsrep_conn_id_t const conn_id,int ops_num)25 node_trx_execute(node_store_t* const store,
26 wsrep_t* const wsrep,
27 wsrep_conn_id_t const conn_id,
28 int ops_num)
29 {
30 wsrep_status_t cert = WSREP_OK; // for cleanup
31
32 static unsigned int const ws_flags =
33 WSREP_FLAG_TRX_START | WSREP_FLAG_TRX_END; // atomic trx
34 wsrep_trx_meta_t ws_meta;
35 wsrep_status_t ret = WSREP_OK;
36
37 /* prepare simple transaction and obtain a writeset handle for it */
38 wsrep_ws_handle_t ws_handle = { 0, NULL };
39 while (ops_num--)
40 {
41 if (0 != (ret = node_store_execute(store, wsrep, &ws_handle)))
42 {
43 #if 0
44 NODE_INFO("master [%d]: node_store_execute() returned %d",
45 conn_id, ret);
46 #endif
47 ret = WSREP_TRX_FAIL;
48 goto cleanup;
49 }
50 }
51
52 /* REPLICATION: (replicate and) certify the writeset (pointed to by
53 * ws_handle) with the cluster */
54 cert = wsrep->certify(wsrep, conn_id, &ws_handle, ws_flags, &ws_meta);
55
56 if (WSREP_BF_ABORT == cert)
57 {
58 /* REPLICATION: transaction was signaled to abort due to multi-master
59 * conflict. It must rollback immediately: it blocks
60 * transaction that was ordered earlier and will never
61 * be able to enter commit order. */
62 node_store_rollback(store, ws_handle.trx_id);
63 }
64
65 /* REPLICATION: writeset was totally ordered, need to enter commit order */
66 if (ws_meta.gtid.seqno > 0)
67 {
68 ret = wsrep->commit_order_enter(wsrep, &ws_handle, &ws_meta);
69 if (ret)
70 {
71 NODE_ERROR("master [%d]: wsrep::commit_order_enter(%lld) failed: "
72 "%d", (long long)(ws_meta.gtid.seqno), ret);
73 goto cleanup;
74 }
75
76 /* REPLICATION: inside commit monitor
77 * Note: we commit transaction only if certification succeded */
78 if (WSREP_OK == cert)
79 node_store_commit(store, ws_handle.trx_id, &ws_meta.gtid);
80 else
81 node_store_update_gtid(store, &ws_meta.gtid);
82
83 ret = wsrep->commit_order_leave(wsrep, &ws_handle, &ws_meta, NULL);
84 if (ret)
85 {
86 NODE_ERROR("master [%d]: wsrep::commit_order_leave(%lld) failed: "
87 "%d", (long long)(ws_meta.gtid.seqno), ret);
88 goto cleanup;
89 }
90 }
91 else
92 {
93 assert(cert);
94 }
95
96 cleanup:
97 /* REPLICATION: if wsrep->certify() returned anything else but WSREP_OK
98 * transaction must roll back. BF aborted trx already did it. */
99 if (cert && WSREP_BF_ABORT != cert)
100 node_store_rollback(store, ws_handle.trx_id);
101
102 /* NOTE: this application follows the approach that resources must be freed
103 * at the same level where they were allocated, so it is assumed that
104 * ws_key and ws were deallocated in either commit or rollback calls.*/
105
106 /* REPLICATION: release provider resources associated with the trx */
107 wsrep->release(wsrep, &ws_handle);
108
109 return ret ? ret : cert;
110 }
111
112 wsrep_status_t
node_trx_apply(node_store_t * const store,wsrep_t * const wsrep,const wsrep_ws_handle_t * const ws_handle,const wsrep_trx_meta_t * const ws_meta,const wsrep_buf_t * const ws)113 node_trx_apply(node_store_t* const store,
114 wsrep_t* const wsrep,
115 const wsrep_ws_handle_t* const ws_handle,
116 const wsrep_trx_meta_t* const ws_meta,
117 const wsrep_buf_t* const ws)
118 {
119 /* no business being here if event was not ordered */
120 assert(ws_meta->gtid.seqno > 0);
121
122 wsrep_trx_id_t trx_id;
123 wsrep_buf_t err_buf = { NULL, 0 };
124 int app_err;
125 if (ws)
126 {
127 app_err = node_store_apply(store, &trx_id, ws);
128 if (app_err)
129 {
130 /* REPLICATION: if applying failed, prepare an error buffer with
131 * sufficient error specification */
132 err_buf.ptr = &app_err; // suppose error code is enough
133 err_buf.len = sizeof(app_err);
134 }
135 }
136 else /* ws failed certification and should be skipped */
137 {
138 /* just some non-0 code to choose node_store_update_gtid() below */
139 app_err = 1;
140 }
141
142 wsrep_status_t ret;
143 ret = wsrep->commit_order_enter(wsrep, ws_handle, ws_meta);
144 if (ret) {
145 node_store_rollback(store, trx_id);
146 return ret;
147 }
148
149 if (!app_err) node_store_commit(store, trx_id, &ws_meta->gtid);
150 else node_store_update_gtid(store, &ws_meta->gtid);
151
152 ret = wsrep->commit_order_leave(wsrep, ws_handle, ws_meta, &err_buf);
153
154 return ret;
155 }
156