1 /*
2 Cluster wide synchronization
3
4 Copyright (C) Amitay Isaacs 2015
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22
23 #include "lib/util/tevent_unix.h"
24
25 #include "client/client.h"
26
27 #include "tests/src/cluster_wait.h"
28
29 #define MSG_ID_JOIN (CTDB_SRVID_TEST_RANGE | 0x1)
30 #define MSG_ID_SYNC (CTDB_SRVID_TEST_RANGE | 0x2)
31
32 /* Wait for all the clients to initialize */
33
34 struct cluster_wait_state {
35 struct tevent_context *ev;
36 struct ctdb_client_context *client;
37 uint32_t num_nodes;
38 bool *ready;
39 bool join_done;
40 };
41
42 static void cluster_wait_join_registered(struct tevent_req *subreq);
43 static void cluster_wait_sync_registered(struct tevent_req *subreq);
44 static void cluster_wait_join(struct tevent_req *subreq);
45 static void cluster_wait_join_sent(struct tevent_req *subreq);
46 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
47 void *private_data);
48 static void cluster_wait_join_unregistered(struct tevent_req *subreq);
49 static void cluster_wait_sync_sent(struct tevent_req *subreq);
50 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
51 void *private_data);
52 static void cluster_wait_sync_unregistered(struct tevent_req *subreq);
53
cluster_wait_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,uint32_t num_nodes)54 struct tevent_req *cluster_wait_send(TALLOC_CTX *mem_ctx,
55 struct tevent_context *ev,
56 struct ctdb_client_context *client,
57 uint32_t num_nodes)
58 {
59 struct tevent_req *req, *subreq;
60 struct cluster_wait_state *state;
61 bool ok;
62
63 req = tevent_req_create(mem_ctx, &state, struct cluster_wait_state);
64 if (req == NULL) {
65 return NULL;
66 }
67
68 state->ev = ev;
69 state->client = client;
70 state->num_nodes = num_nodes;
71
72 state->join_done = false;
73
74 if (ctdb_client_pnn(client) == 0) {
75 state->ready = talloc_zero_array(state, bool, num_nodes);
76 if (tevent_req_nomem(state->ready, req)) {
77 return tevent_req_post(req, ev);
78 }
79
80 subreq = ctdb_client_set_message_handler_send(
81 state, ev, client, MSG_ID_JOIN,
82 cluster_wait_join_handler, req);
83 if (tevent_req_nomem(subreq, req)) {
84 return tevent_req_post(req, ev);
85 }
86 tevent_req_set_callback(subreq, cluster_wait_join_registered,
87 req);
88 }
89
90 subreq = ctdb_client_set_message_handler_send(
91 state, ev, client, MSG_ID_SYNC,
92 cluster_wait_sync_handler, req);
93 if (tevent_req_nomem(subreq, req)) {
94 return tevent_req_post(req, ev);
95 }
96 tevent_req_set_callback(subreq, cluster_wait_sync_registered, req);
97
98 /* If cluster is not synchronized within 30 seconds, time out */
99 ok = tevent_req_set_endtime(
100 req,
101 ev,
102 tevent_timeval_current_ofs(30, 0));
103 if (!ok) {
104 return tevent_req_post(req, ev);
105 }
106
107 return req;
108 }
109
cluster_wait_join_registered(struct tevent_req * subreq)110 static void cluster_wait_join_registered(struct tevent_req *subreq)
111 {
112 struct tevent_req *req = tevent_req_callback_data(
113 subreq, struct tevent_req);
114 bool status;
115 int ret;
116
117 status = ctdb_client_set_message_handler_recv(subreq, &ret);
118 TALLOC_FREE(subreq);
119 if (! status) {
120 tevent_req_error(req, ret);
121 return;
122 }
123
124 printf("Waiting for cluster\n");
125 fflush(stdout);
126 }
127
cluster_wait_sync_registered(struct tevent_req * subreq)128 static void cluster_wait_sync_registered(struct tevent_req *subreq)
129 {
130 struct tevent_req *req = tevent_req_callback_data(
131 subreq, struct tevent_req);
132 struct cluster_wait_state *state = tevent_req_data(
133 req, struct cluster_wait_state);
134 bool status;
135 int ret;
136
137 status = ctdb_client_set_message_handler_recv(subreq, &ret);
138 TALLOC_FREE(subreq);
139 if (! status) {
140 tevent_req_error(req, ret);
141 return;
142 }
143
144 subreq = tevent_wakeup_send(state, state->ev,
145 tevent_timeval_current_ofs(1, 0));
146 if (tevent_req_nomem(subreq, req)) {
147 return;
148 }
149 tevent_req_set_callback(subreq, cluster_wait_join, req);
150 }
151
cluster_wait_join(struct tevent_req * subreq)152 static void cluster_wait_join(struct tevent_req *subreq)
153 {
154 struct tevent_req *req = tevent_req_callback_data(
155 subreq, struct tevent_req);
156 struct cluster_wait_state *state = tevent_req_data(
157 req, struct cluster_wait_state);
158 struct ctdb_req_message msg;
159 uint32_t pnn;
160 bool status;
161
162 status = tevent_wakeup_recv(subreq);
163 TALLOC_FREE(subreq);
164 if (! status) {
165 tevent_req_error(req, EIO);
166 return;
167 }
168
169 pnn = ctdb_client_pnn(state->client);
170
171 msg.srvid = MSG_ID_JOIN;
172 msg.data.data.dsize = sizeof(pnn);
173 msg.data.data.dptr = (uint8_t *)&pnn;
174
175 subreq = ctdb_client_message_send(state, state->ev, state->client,
176 0, &msg);
177 if (tevent_req_nomem(subreq, req)) {
178 return;
179 }
180 tevent_req_set_callback(subreq, cluster_wait_join_sent, req);
181 }
182
cluster_wait_join_sent(struct tevent_req * subreq)183 static void cluster_wait_join_sent(struct tevent_req *subreq)
184 {
185 struct tevent_req *req = tevent_req_callback_data(
186 subreq, struct tevent_req);
187 struct cluster_wait_state *state = tevent_req_data(
188 req, struct cluster_wait_state);
189 bool status;
190 int ret;
191
192 status = ctdb_client_message_recv(subreq, &ret);
193 TALLOC_FREE(subreq);
194 if (! status) {
195 tevent_req_error(req, ret);
196 return;
197 }
198
199 subreq = tevent_wakeup_send(state, state->ev,
200 tevent_timeval_current_ofs(1, 0));
201 if (tevent_req_nomem(subreq, req)) {
202 return;
203 }
204 tevent_req_set_callback(subreq, cluster_wait_join, req);
205 }
206
cluster_wait_join_handler(uint64_t srvid,TDB_DATA data,void * private_data)207 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
208 void *private_data)
209 {
210 struct tevent_req *req = talloc_get_type_abort(
211 private_data, struct tevent_req);
212 struct cluster_wait_state *state = tevent_req_data(
213 req, struct cluster_wait_state);
214 struct tevent_req *subreq;
215 uint32_t pnn;
216 uint32_t i;
217
218 if (srvid != MSG_ID_JOIN) {
219 return;
220 }
221
222 if (data.dsize != sizeof(uint32_t)) {
223 return;
224 }
225
226 pnn = *(uint32_t *)data.dptr;
227
228 if (pnn > state->num_nodes) {
229 return;
230 }
231
232 state->ready[pnn] = true;
233
234 for (i=0; i<state->num_nodes; i++) {
235 if (! state->ready[i]) {
236 return;
237 }
238 }
239
240 if (state->join_done) {
241 return;
242 }
243
244 state->join_done = true;
245 subreq = ctdb_client_remove_message_handler_send(
246 state, state->ev, state->client,
247 MSG_ID_JOIN, req);
248 if (tevent_req_nomem(subreq, req)) {
249 return;
250 }
251 tevent_req_set_callback(subreq, cluster_wait_join_unregistered, req);
252 }
253
cluster_wait_join_unregistered(struct tevent_req * subreq)254 static void cluster_wait_join_unregistered(struct tevent_req *subreq)
255 {
256 struct tevent_req *req = tevent_req_callback_data(
257 subreq, struct tevent_req);
258 struct cluster_wait_state *state = tevent_req_data(
259 req, struct cluster_wait_state);
260 struct ctdb_req_message msg;
261 bool status;
262 int ret;
263
264 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
265 if (! status) {
266 tevent_req_error(req, ret);
267 return;
268 }
269
270 msg.srvid = MSG_ID_SYNC;
271 msg.data.data = tdb_null;
272
273 subreq = ctdb_client_message_send(state, state->ev, state->client,
274 CTDB_BROADCAST_CONNECTED, &msg);
275 if (tevent_req_nomem(subreq, req)) {
276 return;
277 }
278 tevent_req_set_callback(subreq, cluster_wait_sync_sent, req);
279 }
280
cluster_wait_sync_sent(struct tevent_req * subreq)281 static void cluster_wait_sync_sent(struct tevent_req *subreq)
282 {
283 struct tevent_req *req = tevent_req_callback_data(
284 subreq, struct tevent_req);
285 bool status;
286 int ret;
287
288 status = ctdb_client_message_recv(subreq, &ret);
289 TALLOC_FREE(subreq);
290 if (! status) {
291 tevent_req_error(req, ret);
292 return;
293 }
294 }
295
cluster_wait_sync_handler(uint64_t srvid,TDB_DATA data,void * private_data)296 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
297 void *private_data)
298 {
299 struct tevent_req *req = talloc_get_type_abort(
300 private_data, struct tevent_req);
301 struct cluster_wait_state *state = tevent_req_data(
302 req, struct cluster_wait_state);
303 struct tevent_req *subreq;
304
305 if (srvid != MSG_ID_SYNC) {
306 return;
307 }
308
309 subreq = ctdb_client_remove_message_handler_send(
310 state, state->ev, state->client,
311 MSG_ID_SYNC, req);
312 if (tevent_req_nomem(subreq, req)) {
313 return;
314 }
315 tevent_req_set_callback(subreq, cluster_wait_sync_unregistered, req);
316 }
317
cluster_wait_sync_unregistered(struct tevent_req * subreq)318 static void cluster_wait_sync_unregistered(struct tevent_req *subreq)
319 {
320 struct tevent_req *req = tevent_req_callback_data(
321 subreq, struct tevent_req);
322 bool status;
323 int ret;
324
325 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
326 TALLOC_FREE(subreq);
327 if (! status) {
328 tevent_req_error(req, ret);
329 return;
330 }
331
332 tevent_req_done(req);
333 }
334
cluster_wait_recv(struct tevent_req * req,int * perr)335 bool cluster_wait_recv(struct tevent_req *req, int *perr)
336 {
337 int err;
338
339 if (tevent_req_is_unix_error(req, &err)) {
340 if (perr != NULL) {
341 *perr = err;
342 }
343 return false;
344 }
345 return true;
346 }
347