1 /*
2 simple ctdb benchmark
3
4 Copyright (C) Amitay Isaacs 2015
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22
23 #include "lib/util/debug.h"
24 #include "lib/util/tevent_unix.h"
25
26 #include "client/client.h"
27 #include "tests/src/test_options.h"
28 #include "tests/src/cluster_wait.h"
29
30 #define TESTDB "fetch_loop.tdb"
31 #define TESTKEY "testkey"
32
33 struct fetch_loop_state {
34 struct tevent_context *ev;
35 struct ctdb_client_context *client;
36 struct ctdb_db_context *ctdb_db;
37 int num_nodes;
38 int timelimit;
39 TDB_DATA key;
40 int locks_count;
41 };
42
43 static void fetch_loop_start(struct tevent_req *subreq);
44 static void fetch_loop_next(struct tevent_req *subreq);
45 static void fetch_loop_each_second(struct tevent_req *subreq);
46 static void fetch_loop_finish(struct tevent_req *subreq);
47
fetch_loop_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * ctdb_db,int num_nodes,int timelimit)48 static struct tevent_req *fetch_loop_send(TALLOC_CTX *mem_ctx,
49 struct tevent_context *ev,
50 struct ctdb_client_context *client,
51 struct ctdb_db_context *ctdb_db,
52 int num_nodes, int timelimit)
53 {
54 struct tevent_req *req, *subreq;
55 struct fetch_loop_state *state;
56
57 req = tevent_req_create(mem_ctx, &state, struct fetch_loop_state);
58 if (req == NULL) {
59 return NULL;
60 }
61
62 state->ev = ev;
63 state->client = client;
64 state->ctdb_db = ctdb_db;
65 state->num_nodes = num_nodes;
66 state->timelimit = timelimit;
67 state->key.dptr = discard_const(TESTKEY);
68 state->key.dsize = strlen(TESTKEY);
69
70 subreq = cluster_wait_send(state, state->ev, state->client,
71 state->num_nodes);
72 if (tevent_req_nomem(subreq, req)) {
73 return tevent_req_post(req, ev);
74 }
75 tevent_req_set_callback(subreq, fetch_loop_start, req);
76
77 return req;
78 }
79
fetch_loop_start(struct tevent_req * subreq)80 static void fetch_loop_start(struct tevent_req *subreq)
81 {
82 struct tevent_req *req = tevent_req_callback_data(
83 subreq, struct tevent_req);
84 struct fetch_loop_state *state = tevent_req_data(
85 req, struct fetch_loop_state);
86 bool status;
87 int ret;
88
89 status = cluster_wait_recv(subreq, &ret);
90 TALLOC_FREE(subreq);
91 if (! status) {
92 tevent_req_error(req, ret);
93 return;
94 }
95
96 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
97 state->ctdb_db, state->key, false);
98 if (tevent_req_nomem(subreq, req)) {
99 return;
100 }
101 tevent_req_set_callback(subreq, fetch_loop_next, req);
102
103 if (ctdb_client_pnn(state->client) == 0) {
104 subreq = tevent_wakeup_send(state, state->ev,
105 tevent_timeval_current_ofs(1, 0));
106 if (tevent_req_nomem(subreq, req)) {
107 return;
108 }
109 tevent_req_set_callback(subreq, fetch_loop_each_second, req);
110 }
111
112 subreq = tevent_wakeup_send(state, state->ev,
113 tevent_timeval_current_ofs(
114 state->timelimit, 0));
115 if (tevent_req_nomem(subreq, req)) {
116 return;
117 }
118 tevent_req_set_callback(subreq, fetch_loop_finish, req);
119 }
120
fetch_loop_next(struct tevent_req * subreq)121 static void fetch_loop_next(struct tevent_req *subreq)
122 {
123 struct tevent_req *req = tevent_req_callback_data(
124 subreq, struct tevent_req);
125 struct fetch_loop_state *state = tevent_req_data(
126 req, struct fetch_loop_state);
127 struct ctdb_record_handle *h;
128 TDB_DATA data;
129 int ret;
130
131 h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
132 TALLOC_FREE(subreq);
133 if (h == NULL) {
134 tevent_req_error(req, ret);
135 return;
136 }
137
138 if (data.dsize == sizeof(uint32_t)) {
139 state->locks_count = *(uint32_t *)data.dptr;
140 }
141 TALLOC_FREE(data.dptr);
142
143 state->locks_count += 1;
144 data.dsize = sizeof(uint32_t);
145 data.dptr = (uint8_t *)&state->locks_count;
146
147 ret = ctdb_store_record(h, data);
148 if (ret != 0) {
149 talloc_free(h);
150 tevent_req_error(req, ret);
151 return;
152 }
153
154 talloc_free(h);
155
156 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
157 state->ctdb_db, state->key, false);
158 if (tevent_req_nomem(subreq, req)) {
159 return;
160 }
161 tevent_req_set_callback(subreq, fetch_loop_next, req);
162 }
163
fetch_loop_each_second(struct tevent_req * subreq)164 static void fetch_loop_each_second(struct tevent_req *subreq)
165 {
166 struct tevent_req *req = tevent_req_callback_data(
167 subreq, struct tevent_req);
168 struct fetch_loop_state *state = tevent_req_data(
169 req, struct fetch_loop_state);
170 bool status;
171
172 status = tevent_wakeup_recv(subreq);
173 TALLOC_FREE(subreq);
174 if (! status) {
175 tevent_req_error(req, EIO);
176 return;
177 }
178
179 printf("Locks:%d\r", state->locks_count);
180 fflush(stdout);
181
182 subreq = tevent_wakeup_send(state, state->ev,
183 tevent_timeval_current_ofs(1, 0));
184 if (tevent_req_nomem(subreq, req)) {
185 return;
186 }
187 tevent_req_set_callback(subreq, fetch_loop_each_second, req);
188 }
189
fetch_loop_finish(struct tevent_req * subreq)190 static void fetch_loop_finish(struct tevent_req *subreq)
191 {
192 struct tevent_req *req = tevent_req_callback_data(
193 subreq, struct tevent_req);
194 struct fetch_loop_state *state = tevent_req_data(
195 req, struct fetch_loop_state);
196 bool status;
197
198 status = tevent_wakeup_recv(subreq);
199 TALLOC_FREE(subreq);
200 if (! status) {
201 tevent_req_error(req, EIO);
202 return;
203 }
204
205 printf("Locks:%d\n", state->locks_count);
206
207 tevent_req_done(req);
208 }
209
fetch_loop_recv(struct tevent_req * req,int * perr)210 static bool fetch_loop_recv(struct tevent_req *req, int *perr)
211 {
212 int err;
213
214 if (tevent_req_is_unix_error(req, &err)) {
215 if (perr != NULL) {
216 *perr = err;
217 }
218 return false;
219 }
220 return true;
221 }
222
main(int argc,const char * argv[])223 int main(int argc, const char *argv[])
224 {
225 const struct test_options *opts;
226 TALLOC_CTX *mem_ctx;
227 struct tevent_context *ev;
228 struct ctdb_client_context *client;
229 struct ctdb_db_context *ctdb_db;
230 struct tevent_req *req;
231 int ret;
232 bool status;
233
234 setup_logging("fetch_loop", DEBUG_STDERR);
235
236 status = process_options_basic(argc, argv, &opts);
237 if (! status) {
238 exit(1);
239 }
240
241 mem_ctx = talloc_new(NULL);
242 if (mem_ctx == NULL) {
243 fprintf(stderr, "Memory allocation error\n");
244 exit(1);
245 }
246
247 ev = tevent_context_init(mem_ctx);
248 if (ev == NULL) {
249 fprintf(stderr, "Memory allocation error\n");
250 exit(1);
251 }
252
253 ret = ctdb_client_init(mem_ctx, ev, opts->socket, &client);
254 if (ret != 0) {
255 fprintf(stderr, "Failed to initialize client, ret=%d\n", ret);
256 exit(1);
257 }
258
259 if (! ctdb_recovery_wait(ev, client)) {
260 fprintf(stderr, "Memory allocation error\n");
261 exit(1);
262 }
263
264 ret = ctdb_attach(ev, client, tevent_timeval_zero(), TESTDB, 0,
265 &ctdb_db);
266 if (ret != 0) {
267 fprintf(stderr, "Failed to attach to DB %s\n", TESTDB);
268 exit(1);
269 }
270
271 req = fetch_loop_send(mem_ctx, ev, client, ctdb_db,
272 opts->num_nodes, opts->timelimit);
273 if (req == NULL) {
274 fprintf(stderr, "Memory allocation error\n");
275 exit(1);
276 }
277
278 tevent_req_poll(req, ev);
279
280 status = fetch_loop_recv(req, &ret);
281 if (! status) {
282 fprintf(stderr, "fetch loop test failed\n");
283 exit(1);
284 }
285
286 talloc_free(mem_ctx);
287 return 0;
288 }
289