1 /*
2 simple ctdb benchmark - send messages in a ring around cluster
3
4 Copyright (C) Amitay Isaacs 2015
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22
23 #include "lib/util/debug.h"
24 #include "lib/util/time.h"
25 #include "lib/util/tevent_unix.h"
26
27 #include "client/client.h"
28 #include "tests/src/test_options.h"
29 #include "tests/src/cluster_wait.h"
30
31 #define MSG_ID_BENCH 0
32
33 struct message_ring_state {
34 struct tevent_context *ev;
35 struct ctdb_client_context *client;
36 int num_nodes;
37 int timelimit;
38 int interactive;
39 int msg_count;
40 int msg_plus, msg_minus;
41 struct timeval start_time;
42 };
43
44 static void message_ring_wait(struct tevent_req *subreq);
45 static void message_ring_start(struct tevent_req *subreq);
46 static void message_ring_each_second(struct tevent_req *subreq);
47 static void message_ring_msg_sent(struct tevent_req *subreq);
48 static void message_ring_msg_handler(uint64_t srvid, TDB_DATA data,
49 void *private_data);
50 static void message_ring_finish(struct tevent_req *subreq);
51
message_ring_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,int num_nodes,int timelimit,int interactive)52 static struct tevent_req *message_ring_send(TALLOC_CTX *mem_ctx,
53 struct tevent_context *ev,
54 struct ctdb_client_context *client,
55 int num_nodes, int timelimit,
56 int interactive)
57 {
58 struct tevent_req *req, *subreq;
59 struct message_ring_state *state;
60
61 req = tevent_req_create(mem_ctx, &state, struct message_ring_state);
62 if (req == NULL) {
63 return NULL;
64 }
65
66 state->ev = ev;
67 state->client = client;
68 state->num_nodes = num_nodes;
69 state->timelimit = timelimit;
70 state->interactive = interactive;
71
72 subreq = ctdb_client_set_message_handler_send(
73 state, state->ev, state->client,
74 MSG_ID_BENCH,
75 message_ring_msg_handler, req);
76 if (tevent_req_nomem(subreq, req)) {
77 return tevent_req_post(req, ev);
78 }
79 tevent_req_set_callback(subreq, message_ring_wait, req);
80
81 return req;
82 }
83
message_ring_wait(struct tevent_req * subreq)84 static void message_ring_wait(struct tevent_req *subreq)
85 {
86 struct tevent_req *req = tevent_req_callback_data(
87 subreq, struct tevent_req);
88 struct message_ring_state *state = tevent_req_data(
89 req, struct message_ring_state);
90 bool status;
91 int ret;
92
93 status = ctdb_client_set_message_handler_recv(subreq, &ret);
94 TALLOC_FREE(subreq);
95 if (! status) {
96 tevent_req_error(req, ret);
97 return;
98 }
99
100 subreq = cluster_wait_send(state, state->ev, state->client,
101 state->num_nodes);
102 if (tevent_req_nomem(subreq, req)) {
103 return;
104 }
105 tevent_req_set_callback(subreq, message_ring_start, req);
106 }
107
message_ring_start(struct tevent_req * subreq)108 static void message_ring_start(struct tevent_req *subreq)
109 {
110 struct tevent_req *req = tevent_req_callback_data(
111 subreq, struct tevent_req);
112 struct message_ring_state *state = tevent_req_data(
113 req, struct message_ring_state);
114 bool status;
115 int ret;
116
117 status = cluster_wait_recv(subreq, &ret);
118 TALLOC_FREE(subreq);
119 if (! status) {
120 tevent_req_error(req, ret);
121 return;
122 }
123
124 state->start_time = tevent_timeval_current();
125
126 if (ctdb_client_pnn(state->client) == 0) {
127 subreq = tevent_wakeup_send(state, state->ev,
128 tevent_timeval_current_ofs(1, 0));
129 if (tevent_req_nomem(subreq, req)) {
130 return;
131 }
132 tevent_req_set_callback(subreq, message_ring_each_second, req);
133 }
134
135 subreq = tevent_wakeup_send(state, state->ev,
136 tevent_timeval_current_ofs(
137 state->timelimit, 0));
138 if (tevent_req_nomem(subreq, req)) {
139 return;
140 }
141 tevent_req_set_callback(subreq, message_ring_finish, req);
142 }
143
next_node(struct ctdb_client_context * client,int num_nodes,int incr)144 static uint32_t next_node(struct ctdb_client_context *client,
145 int num_nodes, int incr)
146 {
147 return (ctdb_client_pnn(client) + num_nodes + incr) % num_nodes;
148 }
149
message_ring_each_second(struct tevent_req * subreq)150 static void message_ring_each_second(struct tevent_req *subreq)
151 {
152 struct tevent_req *req = tevent_req_callback_data(
153 subreq, struct tevent_req);
154 struct message_ring_state *state = tevent_req_data(
155 req, struct message_ring_state);
156 struct ctdb_req_message msg;
157 uint32_t pnn;
158 int incr;
159 bool status;
160
161 status = tevent_wakeup_recv(subreq);
162 TALLOC_FREE(subreq);
163 if (! status) {
164 tevent_req_error(req, EIO);
165 return;
166 }
167
168 pnn = ctdb_client_pnn(state->client);
169 if (pnn == 0 && state->interactive == 1) {
170 double t;
171
172 t = timeval_elapsed(&state->start_time);
173 printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
174 pnn, state->msg_count / t,
175 state->msg_plus, state->msg_minus);
176 fflush(stdout);
177 }
178
179 if (state->msg_plus == 0) {
180 incr = 1;
181
182 msg.srvid = 0;
183 msg.data.data.dptr = (uint8_t *)&incr;
184 msg.data.data.dsize = sizeof(incr);
185
186 pnn = next_node(state->client, state->num_nodes, incr);
187
188 subreq = ctdb_client_message_send(state, state->ev,
189 state->client, pnn, &msg);
190 if (tevent_req_nomem(subreq, req)) {
191 return;
192 }
193 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
194 }
195
196 if (state->msg_minus == 0) {
197 incr = -1;
198
199 msg.srvid = 0;
200 msg.data.data.dptr = (uint8_t *)&incr;
201 msg.data.data.dsize = sizeof(incr);
202
203 pnn = next_node(state->client, state->num_nodes, incr);
204
205 subreq = ctdb_client_message_send(state, state->ev,
206 state->client, pnn, &msg);
207 if (tevent_req_nomem(subreq, req)) {
208 return;
209 }
210 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
211 }
212
213 subreq = tevent_wakeup_send(state, state->ev,
214 tevent_timeval_current_ofs(1, 0));
215 if (tevent_req_nomem(subreq, req)) {
216 return;
217 }
218 tevent_req_set_callback(subreq, message_ring_each_second, req);
219 }
220
message_ring_msg_sent(struct tevent_req * subreq)221 static void message_ring_msg_sent(struct tevent_req *subreq)
222 {
223 struct tevent_req *req = tevent_req_callback_data(
224 subreq, struct tevent_req);
225 bool status;
226 int ret;
227
228 status = ctdb_client_message_recv(subreq, &ret);
229 TALLOC_FREE(subreq);
230 if (! status) {
231 tevent_req_error(req, ret);
232 }
233 }
234
message_ring_msg_handler(uint64_t srvid,TDB_DATA data,void * private_data)235 static void message_ring_msg_handler(uint64_t srvid, TDB_DATA data,
236 void *private_data)
237 {
238 struct tevent_req *req = talloc_get_type_abort(
239 private_data, struct tevent_req);
240 struct message_ring_state *state = tevent_req_data(
241 req, struct message_ring_state);
242 struct ctdb_req_message msg;
243 struct tevent_req *subreq;
244 int incr;
245 uint32_t pnn;
246
247 if (srvid != MSG_ID_BENCH) {
248 return;
249 }
250
251 if (data.dsize != sizeof(int)) {
252 return;
253 }
254 incr = *(int *)data.dptr;
255
256 state->msg_count += 1;
257 if (incr == 1) {
258 state->msg_plus += 1;
259 } else {
260 state->msg_minus += 1;
261 }
262
263 pnn = next_node(state->client, state->num_nodes, incr);
264
265 msg.srvid = srvid;
266 msg.data.data = data;
267
268 subreq = ctdb_client_message_send(state, state->ev, state->client,
269 pnn, &msg);
270 if (tevent_req_nomem(subreq, req)) {
271 return;
272 }
273 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
274 }
275
message_ring_finish(struct tevent_req * subreq)276 static void message_ring_finish(struct tevent_req *subreq)
277 {
278 struct tevent_req *req = tevent_req_callback_data(
279 subreq, struct tevent_req);
280 struct message_ring_state *state = tevent_req_data(
281 req, struct message_ring_state);
282 bool status;
283 double t;
284
285 status = tevent_wakeup_recv(subreq);
286 TALLOC_FREE(subreq);
287 if (! status) {
288 tevent_req_error(req, EIO);
289 return;
290 }
291
292 t = timeval_elapsed(&state->start_time);
293
294 printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
295 ctdb_client_pnn(state->client), state->msg_count / t,
296 state->msg_plus, state->msg_minus);
297
298 tevent_req_done(req);
299 }
300
message_ring_recv(struct tevent_req * req)301 static bool message_ring_recv(struct tevent_req *req)
302 {
303 int ret;
304
305 if (tevent_req_is_unix_error(req, &ret)) {
306 return false;
307 }
308 return true;
309 }
310
main(int argc,const char * argv[])311 int main(int argc, const char *argv[])
312 {
313 const struct test_options *opts;
314 TALLOC_CTX *mem_ctx;
315 struct tevent_context *ev;
316 struct ctdb_client_context *client;
317 struct tevent_req *req;
318 int ret;
319 bool status;
320
321 setup_logging("message_ring", DEBUG_STDERR);
322
323 status = process_options_basic(argc, argv, &opts);
324 if (! status) {
325 exit(1);
326 }
327
328 mem_ctx = talloc_new(NULL);
329 if (mem_ctx == NULL) {
330 fprintf(stderr, "Memory allocation error\n");
331 exit(1);
332 }
333
334 ev = tevent_context_init(mem_ctx);
335 if (ev == NULL) {
336 fprintf(stderr, "Memory allocation error\n");
337 exit(1);
338 }
339
340 ret = ctdb_client_init(mem_ctx, ev, opts->socket, &client);
341 if (ret != 0) {
342 fprintf(stderr, "Failed to initialize client, ret=%d\n", ret);
343 exit(1);
344 }
345
346 if (! ctdb_recovery_wait(ev, client)) {
347 fprintf(stderr, "Failed to wait for recovery\n");
348 exit(1);
349 }
350
351 req = message_ring_send(mem_ctx, ev, client,
352 opts->num_nodes, opts->timelimit,
353 opts->interactive);
354 if (req == NULL) {
355 fprintf(stderr, "Memory allocation error\n");
356 exit(1);
357 }
358
359 tevent_req_poll(req, ev);
360
361 status = message_ring_recv(req);
362 if (! status) {
363 fprintf(stderr, "message ring test failed\n");
364 exit(1);
365 }
366
367 talloc_free(mem_ctx);
368 return 0;
369 }
370