1 /*
2    simple ctdb benchmark
3 
4    Copyright (C) Amitay Isaacs  2015
5 
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10 
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include "replace.h"
21 #include "system/network.h"
22 
23 #include "lib/util/debug.h"
24 #include "lib/util/tevent_unix.h"
25 
26 #include "client/client.h"
27 #include "tests/src/test_options.h"
28 #include "tests/src/cluster_wait.h"
29 
30 #define TESTDB	"fetch_loop.tdb"
31 #define TESTKEY	"testkey"
32 
33 struct fetch_loop_state {
34 	struct tevent_context *ev;
35 	struct ctdb_client_context *client;
36 	struct ctdb_db_context *ctdb_db;
37 	int num_nodes;
38 	int timelimit;
39 	TDB_DATA key;
40 	int locks_count;
41 };
42 
43 static void fetch_loop_start(struct tevent_req *subreq);
44 static void fetch_loop_next(struct tevent_req *subreq);
45 static void fetch_loop_each_second(struct tevent_req *subreq);
46 static void fetch_loop_finish(struct tevent_req *subreq);
47 
fetch_loop_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * ctdb_db,int num_nodes,int timelimit)48 static struct tevent_req *fetch_loop_send(TALLOC_CTX *mem_ctx,
49 					  struct tevent_context *ev,
50 					  struct ctdb_client_context *client,
51 					  struct ctdb_db_context *ctdb_db,
52 					  int num_nodes, int timelimit)
53 {
54 	struct tevent_req *req, *subreq;
55 	struct fetch_loop_state *state;
56 
57 	req = tevent_req_create(mem_ctx, &state, struct fetch_loop_state);
58 	if (req == NULL) {
59 		return NULL;
60 	}
61 
62 	state->ev = ev;
63 	state->client = client;
64 	state->ctdb_db = ctdb_db;
65 	state->num_nodes = num_nodes;
66 	state->timelimit = timelimit;
67 	state->key.dptr = discard_const(TESTKEY);
68 	state->key.dsize = strlen(TESTKEY);
69 
70 	subreq = cluster_wait_send(state, state->ev, state->client,
71 				   state->num_nodes);
72 	if (tevent_req_nomem(subreq, req)) {
73 		return tevent_req_post(req, ev);
74 	}
75 	tevent_req_set_callback(subreq, fetch_loop_start, req);
76 
77 	return req;
78 }
79 
fetch_loop_start(struct tevent_req * subreq)80 static void fetch_loop_start(struct tevent_req *subreq)
81 {
82 	struct tevent_req *req = tevent_req_callback_data(
83 		subreq, struct tevent_req);
84 	struct fetch_loop_state *state = tevent_req_data(
85 		req, struct fetch_loop_state);
86 	bool status;
87 	int ret;
88 
89 	status = cluster_wait_recv(subreq, &ret);
90 	TALLOC_FREE(subreq);
91 	if (! status) {
92 		tevent_req_error(req, ret);
93 		return;
94 	}
95 
96 	subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
97 				      state->ctdb_db, state->key, false);
98 	if (tevent_req_nomem(subreq, req)) {
99 		return;
100 	}
101 	tevent_req_set_callback(subreq, fetch_loop_next, req);
102 
103 	if (ctdb_client_pnn(state->client) == 0) {
104 		subreq = tevent_wakeup_send(state, state->ev,
105 					    tevent_timeval_current_ofs(1, 0));
106 		if (tevent_req_nomem(subreq, req)) {
107 			return;
108 		}
109 		tevent_req_set_callback(subreq, fetch_loop_each_second, req);
110 	}
111 
112 	subreq = tevent_wakeup_send(state, state->ev,
113 				    tevent_timeval_current_ofs(
114 					    state->timelimit, 0));
115 	if (tevent_req_nomem(subreq, req)) {
116 		return;
117 	}
118 	tevent_req_set_callback(subreq, fetch_loop_finish, req);
119 }
120 
fetch_loop_next(struct tevent_req * subreq)121 static void fetch_loop_next(struct tevent_req *subreq)
122 {
123 	struct tevent_req *req = tevent_req_callback_data(
124 		subreq, struct tevent_req);
125 	struct fetch_loop_state *state = tevent_req_data(
126 		req, struct fetch_loop_state);
127 	struct ctdb_record_handle *h;
128 	TDB_DATA data;
129 	int ret;
130 
131 	h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
132 	TALLOC_FREE(subreq);
133 	if (h == NULL) {
134 		tevent_req_error(req, ret);
135 		return;
136 	}
137 
138 	if (data.dsize == sizeof(uint32_t)) {
139 		state->locks_count = *(uint32_t *)data.dptr;
140 	}
141 	TALLOC_FREE(data.dptr);
142 
143 	state->locks_count += 1;
144 	data.dsize = sizeof(uint32_t);
145 	data.dptr = (uint8_t *)&state->locks_count;
146 
147 	ret = ctdb_store_record(h, data);
148 	if (ret != 0) {
149 		talloc_free(h);
150 		tevent_req_error(req, ret);
151 		return;
152 	}
153 
154 	talloc_free(h);
155 
156 	subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
157 				      state->ctdb_db, state->key, false);
158 	if (tevent_req_nomem(subreq, req)) {
159 		return;
160 	}
161 	tevent_req_set_callback(subreq, fetch_loop_next, req);
162 }
163 
fetch_loop_each_second(struct tevent_req * subreq)164 static void fetch_loop_each_second(struct tevent_req *subreq)
165 {
166 	struct tevent_req *req = tevent_req_callback_data(
167 		subreq, struct tevent_req);
168 	struct fetch_loop_state *state = tevent_req_data(
169 		req, struct fetch_loop_state);
170 	bool status;
171 
172 	status = tevent_wakeup_recv(subreq);
173 	TALLOC_FREE(subreq);
174 	if (! status) {
175 		tevent_req_error(req, EIO);
176 		return;
177 	}
178 
179 	printf("Locks:%d\r", state->locks_count);
180 	fflush(stdout);
181 
182 	subreq = tevent_wakeup_send(state, state->ev,
183 				    tevent_timeval_current_ofs(1, 0));
184 	if (tevent_req_nomem(subreq, req)) {
185 		return;
186 	}
187 	tevent_req_set_callback(subreq, fetch_loop_each_second, req);
188 }
189 
fetch_loop_finish(struct tevent_req * subreq)190 static void fetch_loop_finish(struct tevent_req *subreq)
191 {
192 	struct tevent_req *req = tevent_req_callback_data(
193 		subreq, struct tevent_req);
194 	struct fetch_loop_state *state = tevent_req_data(
195 		req, struct fetch_loop_state);
196 	bool status;
197 
198 	status = tevent_wakeup_recv(subreq);
199 	TALLOC_FREE(subreq);
200 	if (! status) {
201 		tevent_req_error(req, EIO);
202 		return;
203 	}
204 
205 	printf("Locks:%d\n", state->locks_count);
206 
207 	tevent_req_done(req);
208 }
209 
fetch_loop_recv(struct tevent_req * req,int * perr)210 static bool fetch_loop_recv(struct tevent_req *req, int *perr)
211 {
212 	int err;
213 
214 	if (tevent_req_is_unix_error(req, &err)) {
215 		if (perr != NULL) {
216 			*perr = err;
217 		}
218 		return false;
219 	}
220 	return true;
221 }
222 
main(int argc,const char * argv[])223 int main(int argc, const char *argv[])
224 {
225 	const struct test_options *opts;
226 	TALLOC_CTX *mem_ctx;
227 	struct tevent_context *ev;
228 	struct ctdb_client_context *client;
229 	struct ctdb_db_context *ctdb_db;
230 	struct tevent_req *req;
231 	int ret;
232 	bool status;
233 
234 	setup_logging("fetch_loop", DEBUG_STDERR);
235 
236 	status = process_options_basic(argc, argv, &opts);
237 	if (! status) {
238 		exit(1);
239 	}
240 
241 	mem_ctx = talloc_new(NULL);
242 	if (mem_ctx == NULL) {
243 		fprintf(stderr, "Memory allocation error\n");
244 		exit(1);
245 	}
246 
247 	ev = tevent_context_init(mem_ctx);
248 	if (ev == NULL) {
249 		fprintf(stderr, "Memory allocation error\n");
250 		exit(1);
251 	}
252 
253 	ret = ctdb_client_init(mem_ctx, ev, opts->socket, &client);
254 	if (ret != 0) {
255 		fprintf(stderr, "Failed to initialize client, ret=%d\n", ret);
256 		exit(1);
257 	}
258 
259 	if (! ctdb_recovery_wait(ev, client)) {
260 		fprintf(stderr, "Memory allocation error\n");
261 		exit(1);
262 	}
263 
264 	ret = ctdb_attach(ev, client, tevent_timeval_zero(), TESTDB, 0,
265 			  &ctdb_db);
266 	if (ret != 0) {
267 		fprintf(stderr, "Failed to attach to DB %s\n", TESTDB);
268 		exit(1);
269 	}
270 
271 	req = fetch_loop_send(mem_ctx, ev, client, ctdb_db,
272 			       opts->num_nodes, opts->timelimit);
273 	if (req == NULL) {
274 		fprintf(stderr, "Memory allocation error\n");
275 		exit(1);
276 	}
277 
278 	tevent_req_poll(req, ev);
279 
280 	status = fetch_loop_recv(req, &ret);
281 	if (! status) {
282 		fprintf(stderr, "fetch loop test failed\n");
283 		exit(1);
284 	}
285 
286 	talloc_free(mem_ctx);
287 	return 0;
288 }
289