1 /*
2    Cluster wide synchronization
3 
4    Copyright (C) Amitay Isaacs  2015
5 
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10 
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include "replace.h"
21 #include "system/network.h"
22 
23 #include "lib/util/tevent_unix.h"
24 
25 #include "client/client.h"
26 
27 #include "tests/src/cluster_wait.h"
28 
29 #define MSG_ID_JOIN	(CTDB_SRVID_TEST_RANGE | 0x1)
30 #define MSG_ID_SYNC	(CTDB_SRVID_TEST_RANGE | 0x2)
31 
32 /* Wait for all the clients to initialize */
33 
34 struct cluster_wait_state {
35 	struct tevent_context *ev;
36 	struct ctdb_client_context *client;
37 	uint32_t num_nodes;
38 	bool *ready;
39 	bool join_done;
40 };
41 
42 static void cluster_wait_join_registered(struct tevent_req *subreq);
43 static void cluster_wait_sync_registered(struct tevent_req *subreq);
44 static void cluster_wait_join(struct tevent_req *subreq);
45 static void cluster_wait_join_sent(struct tevent_req *subreq);
46 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
47 				      void *private_data);
48 static void cluster_wait_join_unregistered(struct tevent_req *subreq);
49 static void cluster_wait_sync_sent(struct tevent_req *subreq);
50 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
51 				      void *private_data);
52 static void cluster_wait_sync_unregistered(struct tevent_req *subreq);
53 
cluster_wait_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,uint32_t num_nodes)54 struct tevent_req *cluster_wait_send(TALLOC_CTX *mem_ctx,
55 				     struct tevent_context *ev,
56 				     struct ctdb_client_context *client,
57 				     uint32_t num_nodes)
58 {
59 	struct tevent_req *req, *subreq;
60 	struct cluster_wait_state *state;
61 	bool ok;
62 
63 	req = tevent_req_create(mem_ctx, &state, struct cluster_wait_state);
64 	if (req == NULL) {
65 		return NULL;
66 	}
67 
68 	state->ev = ev;
69 	state->client = client;
70 	state->num_nodes = num_nodes;
71 
72 	state->join_done = false;
73 
74 	if (ctdb_client_pnn(client) == 0) {
75 		state->ready = talloc_zero_array(state, bool, num_nodes);
76 		if (tevent_req_nomem(state->ready, req)) {
77 			return tevent_req_post(req, ev);
78 		}
79 
80 		subreq = ctdb_client_set_message_handler_send(
81 					state, ev, client, MSG_ID_JOIN,
82 					cluster_wait_join_handler, req);
83 		if (tevent_req_nomem(subreq, req)) {
84 			return tevent_req_post(req, ev);
85 		}
86 		tevent_req_set_callback(subreq, cluster_wait_join_registered,
87 					req);
88 	}
89 
90 	subreq = ctdb_client_set_message_handler_send(
91 					state, ev, client, MSG_ID_SYNC,
92 					cluster_wait_sync_handler, req);
93 	if (tevent_req_nomem(subreq, req)) {
94 		return tevent_req_post(req, ev);
95 	}
96 	tevent_req_set_callback(subreq, cluster_wait_sync_registered, req);
97 
98 	/* If cluster is not synchronized within 30 seconds, time out */
99 	ok = tevent_req_set_endtime(
100 		req,
101 		ev,
102 		tevent_timeval_current_ofs(30, 0));
103 	if (!ok) {
104 		return tevent_req_post(req, ev);
105 	}
106 
107 	return req;
108 }
109 
cluster_wait_join_registered(struct tevent_req * subreq)110 static void cluster_wait_join_registered(struct tevent_req *subreq)
111 {
112 	struct tevent_req *req = tevent_req_callback_data(
113 		subreq, struct tevent_req);
114 	bool status;
115 	int ret;
116 
117 	status = ctdb_client_set_message_handler_recv(subreq, &ret);
118 	TALLOC_FREE(subreq);
119 	if (! status) {
120 		tevent_req_error(req, ret);
121 		return;
122 	}
123 
124 	printf("Waiting for cluster\n");
125 	fflush(stdout);
126 }
127 
cluster_wait_sync_registered(struct tevent_req * subreq)128 static void cluster_wait_sync_registered(struct tevent_req *subreq)
129 {
130 	struct tevent_req *req = tevent_req_callback_data(
131 		subreq, struct tevent_req);
132 	struct cluster_wait_state *state = tevent_req_data(
133 		req, struct cluster_wait_state);
134 	bool status;
135 	int ret;
136 
137 	status = ctdb_client_set_message_handler_recv(subreq, &ret);
138 	TALLOC_FREE(subreq);
139 	if (! status) {
140 		tevent_req_error(req, ret);
141 		return;
142 	}
143 
144 	subreq = tevent_wakeup_send(state, state->ev,
145 				    tevent_timeval_current_ofs(1, 0));
146 	if (tevent_req_nomem(subreq, req)) {
147 		return;
148 	}
149 	tevent_req_set_callback(subreq, cluster_wait_join, req);
150 }
151 
cluster_wait_join(struct tevent_req * subreq)152 static void cluster_wait_join(struct tevent_req *subreq)
153 {
154 	struct tevent_req *req = tevent_req_callback_data(
155 		subreq, struct tevent_req);
156 	struct cluster_wait_state *state = tevent_req_data(
157 		req, struct cluster_wait_state);
158 	struct ctdb_req_message msg;
159 	uint32_t pnn;
160 	bool status;
161 
162 	status = tevent_wakeup_recv(subreq);
163 	TALLOC_FREE(subreq);
164 	if (! status) {
165 		tevent_req_error(req, EIO);
166 		return;
167 	}
168 
169 	pnn = ctdb_client_pnn(state->client);
170 
171 	msg.srvid = MSG_ID_JOIN;
172 	msg.data.data.dsize = sizeof(pnn);
173 	msg.data.data.dptr = (uint8_t *)&pnn;
174 
175 	subreq = ctdb_client_message_send(state, state->ev, state->client,
176 					  0, &msg);
177 	if (tevent_req_nomem(subreq, req)) {
178 		return;
179 	}
180 	tevent_req_set_callback(subreq, cluster_wait_join_sent, req);
181 }
182 
cluster_wait_join_sent(struct tevent_req * subreq)183 static void cluster_wait_join_sent(struct tevent_req *subreq)
184 {
185 	struct tevent_req *req = tevent_req_callback_data(
186 		subreq, struct tevent_req);
187 	struct cluster_wait_state *state = tevent_req_data(
188 		req, struct cluster_wait_state);
189 	bool status;
190 	int ret;
191 
192 	status = ctdb_client_message_recv(subreq, &ret);
193 	TALLOC_FREE(subreq);
194 	if (! status) {
195 		tevent_req_error(req, ret);
196 		return;
197 	}
198 
199 	subreq = tevent_wakeup_send(state, state->ev,
200 				    tevent_timeval_current_ofs(1, 0));
201 	if (tevent_req_nomem(subreq, req)) {
202 		return;
203 	}
204 	tevent_req_set_callback(subreq, cluster_wait_join, req);
205 }
206 
cluster_wait_join_handler(uint64_t srvid,TDB_DATA data,void * private_data)207 static void cluster_wait_join_handler(uint64_t srvid, TDB_DATA data,
208 				      void *private_data)
209 {
210 	struct tevent_req *req = talloc_get_type_abort(
211 		private_data, struct tevent_req);
212 	struct cluster_wait_state *state = tevent_req_data(
213 		req, struct cluster_wait_state);
214 	struct tevent_req *subreq;
215 	uint32_t pnn;
216 	uint32_t i;
217 
218 	if (srvid != MSG_ID_JOIN) {
219 		return;
220 	}
221 
222 	if (data.dsize != sizeof(uint32_t)) {
223 		return;
224 	}
225 
226 	pnn = *(uint32_t *)data.dptr;
227 
228 	if (pnn > state->num_nodes) {
229 		return;
230 	}
231 
232 	state->ready[pnn] = true;
233 
234 	for (i=0; i<state->num_nodes; i++) {
235 		if (! state->ready[i]) {
236 			return;
237 		}
238 	}
239 
240 	if (state->join_done) {
241 		return;
242 	}
243 
244 	state->join_done = true;
245 	subreq = ctdb_client_remove_message_handler_send(
246 					state, state->ev, state->client,
247 					MSG_ID_JOIN, req);
248 	if (tevent_req_nomem(subreq, req)) {
249 		return;
250 	}
251 	tevent_req_set_callback(subreq, cluster_wait_join_unregistered, req);
252 }
253 
cluster_wait_join_unregistered(struct tevent_req * subreq)254 static void cluster_wait_join_unregistered(struct tevent_req *subreq)
255 {
256 	struct tevent_req *req = tevent_req_callback_data(
257 		subreq, struct tevent_req);
258 	struct cluster_wait_state *state = tevent_req_data(
259 		req, struct cluster_wait_state);
260 	struct ctdb_req_message msg;
261 	bool status;
262 	int ret;
263 
264 	status = ctdb_client_remove_message_handler_recv(subreq, &ret);
265 	if (! status) {
266 		tevent_req_error(req, ret);
267 		return;
268 	}
269 
270 	msg.srvid = MSG_ID_SYNC;
271 	msg.data.data = tdb_null;
272 
273 	subreq = ctdb_client_message_send(state, state->ev, state->client,
274 					  CTDB_BROADCAST_CONNECTED, &msg);
275 	if (tevent_req_nomem(subreq, req)) {
276 		return;
277 	}
278 	tevent_req_set_callback(subreq, cluster_wait_sync_sent, req);
279 }
280 
cluster_wait_sync_sent(struct tevent_req * subreq)281 static void cluster_wait_sync_sent(struct tevent_req *subreq)
282 {
283 	struct tevent_req *req = tevent_req_callback_data(
284 		subreq, struct tevent_req);
285 	bool status;
286 	int ret;
287 
288 	status = ctdb_client_message_recv(subreq, &ret);
289 	TALLOC_FREE(subreq);
290 	if (! status) {
291 		tevent_req_error(req, ret);
292 		return;
293 	}
294 }
295 
cluster_wait_sync_handler(uint64_t srvid,TDB_DATA data,void * private_data)296 static void cluster_wait_sync_handler(uint64_t srvid, TDB_DATA data,
297 				      void *private_data)
298 {
299 	struct tevent_req *req = talloc_get_type_abort(
300 		private_data, struct tevent_req);
301 	struct cluster_wait_state *state = tevent_req_data(
302 		req, struct cluster_wait_state);
303 	struct tevent_req *subreq;
304 
305 	if (srvid != MSG_ID_SYNC) {
306 		return;
307 	}
308 
309 	subreq = ctdb_client_remove_message_handler_send(
310 					state, state->ev, state->client,
311 					MSG_ID_SYNC, req);
312 	if (tevent_req_nomem(subreq, req)) {
313 		return;
314 	}
315 	tevent_req_set_callback(subreq, cluster_wait_sync_unregistered, req);
316 }
317 
cluster_wait_sync_unregistered(struct tevent_req * subreq)318 static void cluster_wait_sync_unregistered(struct tevent_req *subreq)
319 {
320 	struct tevent_req *req = tevent_req_callback_data(
321 		subreq, struct tevent_req);
322 	bool status;
323 	int ret;
324 
325 	status = ctdb_client_remove_message_handler_recv(subreq, &ret);
326 	TALLOC_FREE(subreq);
327 	if (! status) {
328 		tevent_req_error(req, ret);
329 		return;
330 	}
331 
332 	tevent_req_done(req);
333 }
334 
cluster_wait_recv(struct tevent_req * req,int * perr)335 bool cluster_wait_recv(struct tevent_req *req, int *perr)
336 {
337 	int err;
338 
339 	if (tevent_req_is_unix_error(req, &err)) {
340 		if (perr != NULL) {
341 			*perr = err;
342 		}
343 		return false;
344 	}
345 	return true;
346 }
347