1 /*
2    CTDB client code
3 
4    Copyright (C) Amitay Isaacs  2015
5 
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10 
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15 
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 
28 #include "common/logging.h"
29 
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
39 
client_db_tdb(struct ctdb_db_context * db)40 struct tdb_context *client_db_tdb(struct ctdb_db_context *db)
41 {
42 	return db->ltdb->tdb;
43 }
44 
client_db_handle(struct ctdb_client_context * client,const char * db_name)45 static struct ctdb_db_context *client_db_handle(
46 					struct ctdb_client_context *client,
47 					const char *db_name)
48 {
49 	struct ctdb_db_context *db;
50 
51 	for (db = client->db; db != NULL; db = db->next) {
52 		if (strcmp(db_name, db->db_name) == 0) {
53 			return db;
54 		}
55 	}
56 
57 	return NULL;
58 }
59 
ctdb_db_persistent(struct ctdb_db_context * db)60 static bool ctdb_db_persistent(struct ctdb_db_context *db)
61 {
62 	if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
63 		return true;
64 	}
65 	return false;
66 }
67 
ctdb_db_replicated(struct ctdb_db_context * db)68 static bool ctdb_db_replicated(struct ctdb_db_context *db)
69 {
70 	if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
71 		return true;
72 	}
73 	return false;
74 }
75 
ctdb_db_volatile(struct ctdb_db_context * db)76 static bool ctdb_db_volatile(struct ctdb_db_context *db)
77 {
78 	if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT ||
79 	    db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
80 		return false;
81 	}
82 	return true;
83 }
84 
85 struct ctdb_set_db_flags_state {
86 	struct tevent_context *ev;
87 	struct ctdb_client_context *client;
88 	struct timeval timeout;
89 	uint32_t db_id;
90 	uint8_t db_flags;
91 	bool readonly_done, sticky_done;
92 	uint32_t *pnn_list;
93 	int count;
94 };
95 
96 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
97 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
98 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
99 
ctdb_set_db_flags_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,uint32_t destnode,struct timeval timeout,uint32_t db_id,uint8_t db_flags)100 static struct tevent_req *ctdb_set_db_flags_send(
101 				TALLOC_CTX *mem_ctx,
102 				struct tevent_context *ev,
103 				struct ctdb_client_context *client,
104 				uint32_t destnode, struct timeval timeout,
105 				uint32_t db_id, uint8_t db_flags)
106 {
107 	struct tevent_req *req, *subreq;
108 	struct ctdb_set_db_flags_state *state;
109 	struct ctdb_req_control request;
110 
111 	req = tevent_req_create(mem_ctx, &state,
112 				struct ctdb_set_db_flags_state);
113 	if (req == NULL) {
114 		return NULL;
115 	}
116 
117 	if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
118 		tevent_req_done(req);
119 		return tevent_req_post(req, ev);
120 	}
121 
122 	state->ev = ev;
123 	state->client = client;
124 	state->timeout = timeout;
125 	state->db_id = db_id;
126 	state->db_flags = db_flags;
127 
128 	ctdb_req_control_get_nodemap(&request);
129 	subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
130 					  &request);
131 	if (tevent_req_nomem(subreq, req)) {
132 		return tevent_req_post(req, ev);
133 	}
134 	tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
135 
136 	return req;
137 }
138 
ctdb_set_db_flags_nodemap_done(struct tevent_req * subreq)139 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
140 {
141 	struct tevent_req *req = tevent_req_callback_data(
142 		subreq, struct tevent_req);
143 	struct ctdb_set_db_flags_state *state = tevent_req_data(
144 		req, struct ctdb_set_db_flags_state);
145 	struct ctdb_req_control request;
146 	struct ctdb_reply_control *reply;
147 	struct ctdb_node_map *nodemap;
148 	int ret;
149 	bool status;
150 
151 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
152 	TALLOC_FREE(subreq);
153 	if (! status) {
154 		DEBUG(DEBUG_ERR,
155 		      ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
156 		       state->db_id, ret));
157 		tevent_req_error(req, ret);
158 		return;
159 	}
160 
161 	ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
162 	talloc_free(reply);
163 	if (ret != 0) {
164 		DEBUG(DEBUG_ERR,
165 		      ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
166 		      state->db_id, ret));
167 		tevent_req_error(req, ret);
168 		return;
169 	}
170 
171 	state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
172 					       state, &state->pnn_list);
173 	talloc_free(nodemap);
174 	if (state->count <= 0) {
175 		DEBUG(DEBUG_ERR,
176 		      ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
177 		       state->db_id, state->count));
178 		tevent_req_error(req, ENOMEM);
179 		return;
180 	}
181 
182 	if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
183 		ctdb_req_control_set_db_readonly(&request, state->db_id);
184 		subreq = ctdb_client_control_multi_send(
185 					state, state->ev, state->client,
186 					state->pnn_list, state->count,
187 					state->timeout, &request);
188 		if (tevent_req_nomem(subreq, req)) {
189 			return;
190 		}
191 		tevent_req_set_callback(subreq,
192 					ctdb_set_db_flags_readonly_done, req);
193 	} else {
194 		state->readonly_done = true;
195 	}
196 
197 	if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
198 		ctdb_req_control_set_db_sticky(&request, state->db_id);
199 		subreq = ctdb_client_control_multi_send(
200 					state, state->ev, state->client,
201 					state->pnn_list, state->count,
202 					state->timeout, &request);
203 		if (tevent_req_nomem(subreq, req)) {
204 			return;
205 		}
206 		tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
207 					req);
208 	} else {
209 		state->sticky_done = true;
210 	}
211 }
212 
ctdb_set_db_flags_readonly_done(struct tevent_req * subreq)213 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
214 {
215 	struct tevent_req *req = tevent_req_callback_data(
216 		subreq, struct tevent_req);
217 	struct ctdb_set_db_flags_state *state = tevent_req_data(
218 		req, struct ctdb_set_db_flags_state);
219 	int ret;
220 	bool status;
221 
222 	status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
223 						NULL);
224 	TALLOC_FREE(subreq);
225 	if (! status) {
226 		DEBUG(DEBUG_ERR,
227 		      ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
228 		       state->db_id, ret));
229 		tevent_req_error(req, ret);
230 		return;
231 	}
232 
233 	state->readonly_done = true;
234 
235 	if (state->readonly_done && state->sticky_done) {
236 		tevent_req_done(req);
237 	}
238 }
239 
ctdb_set_db_flags_sticky_done(struct tevent_req * subreq)240 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
241 {
242 	struct tevent_req *req = tevent_req_callback_data(
243 		subreq, struct tevent_req);
244 	struct ctdb_set_db_flags_state *state = tevent_req_data(
245 		req, struct ctdb_set_db_flags_state);
246 	int ret;
247 	bool status;
248 
249 	status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
250 						NULL);
251 	TALLOC_FREE(subreq);
252 	if (! status) {
253 		DEBUG(DEBUG_ERR,
254 		      ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
255 		       state->db_id, ret));
256 		tevent_req_error(req, ret);
257 		return;
258 	}
259 
260 	state->sticky_done = true;
261 
262 	if (state->readonly_done && state->sticky_done) {
263 		tevent_req_done(req);
264 	}
265 }
266 
ctdb_set_db_flags_recv(struct tevent_req * req,int * perr)267 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
268 {
269 	int err;
270 
271 	if (tevent_req_is_unix_error(req, &err)) {
272 		if (perr != NULL) {
273 			*perr = err;
274 		}
275 		return false;
276 	}
277 	return true;
278 }
279 
280 struct ctdb_attach_state {
281 	struct tevent_context *ev;
282 	struct ctdb_client_context *client;
283 	struct timeval timeout;
284 	uint32_t destnode;
285 	uint8_t db_flags;
286 	struct ctdb_db_context *db;
287 };
288 
289 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
290 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
291 static void ctdb_attach_health_done(struct tevent_req *subreq);
292 static void ctdb_attach_flags_done(struct tevent_req *subreq);
293 static void ctdb_attach_open_flags_done(struct tevent_req *subreq);
294 
ctdb_attach_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct timeval timeout,const char * db_name,uint8_t db_flags)295 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
296 				    struct tevent_context *ev,
297 				    struct ctdb_client_context *client,
298 				    struct timeval timeout,
299 				    const char *db_name, uint8_t db_flags)
300 {
301 	struct tevent_req *req, *subreq;
302 	struct ctdb_attach_state *state;
303 	struct ctdb_req_control request;
304 
305 	req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
306 	if (req == NULL) {
307 		return NULL;
308 	}
309 
310 	state->db = client_db_handle(client, db_name);
311 	if (state->db != NULL) {
312 		tevent_req_done(req);
313 		return tevent_req_post(req, ev);
314 	}
315 
316 	state->ev = ev;
317 	state->client = client;
318 	state->timeout = timeout;
319 	state->destnode = ctdb_client_pnn(client);
320 	state->db_flags = db_flags;
321 
322 	state->db = talloc_zero(client, struct ctdb_db_context);
323 	if (tevent_req_nomem(state->db, req)) {
324 		return tevent_req_post(req, ev);
325 	}
326 
327 	state->db->db_name = talloc_strdup(state->db, db_name);
328 	if (tevent_req_nomem(state->db, req)) {
329 		return tevent_req_post(req, ev);
330 	}
331 
332 	state->db->db_flags = db_flags;
333 
334 	if (ctdb_db_persistent(state->db)) {
335 		ctdb_req_control_db_attach_persistent(&request,
336 						      state->db->db_name);
337 	} else if (ctdb_db_replicated(state->db)) {
338 		ctdb_req_control_db_attach_replicated(&request,
339 						      state->db->db_name);
340 	} else {
341 		ctdb_req_control_db_attach(&request, state->db->db_name);
342 	}
343 
344 	subreq = ctdb_client_control_send(state, state->ev, state->client,
345 					  state->destnode, state->timeout,
346 					  &request);
347 	if (tevent_req_nomem(subreq, req)) {
348 		return tevent_req_post(req, ev);
349 	}
350 	tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
351 
352 	return req;
353 }
354 
ctdb_attach_dbid_done(struct tevent_req * subreq)355 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
356 {
357 	struct tevent_req *req = tevent_req_callback_data(
358 		subreq, struct tevent_req);
359 	struct ctdb_attach_state *state = tevent_req_data(
360 		req, struct ctdb_attach_state);
361 	struct ctdb_req_control request;
362 	struct ctdb_reply_control *reply;
363 	bool status;
364 	int ret;
365 
366 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
367 	TALLOC_FREE(subreq);
368 	if (! status) {
369 		DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
370 				  state->db->db_name,
371 				  (ctdb_db_persistent(state->db)
372 					? "DB_ATTACH_PERSISTENT"
373 					: (ctdb_db_replicated(state->db)
374 						? "DB_ATTACH_REPLICATED"
375 						: "DB_ATTACH")),
376 				  ret));
377 		tevent_req_error(req, ret);
378 		return;
379 	}
380 
381 	if (ctdb_db_persistent(state->db)) {
382 		ret = ctdb_reply_control_db_attach_persistent(
383 				reply, &state->db->db_id);
384 	} else if (ctdb_db_replicated(state->db)) {
385 		ret = ctdb_reply_control_db_attach_replicated(
386 				reply, &state->db->db_id);
387 	} else {
388 		ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
389 	}
390 	talloc_free(reply);
391 	if (ret != 0) {
392 		DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
393 				  state->db->db_name, ret));
394 		tevent_req_error(req, ret);
395 		return;
396 	}
397 
398 	ctdb_req_control_getdbpath(&request, state->db->db_id);
399 	subreq = ctdb_client_control_send(state, state->ev, state->client,
400 					  state->destnode, state->timeout,
401 					  &request);
402 	if (tevent_req_nomem(subreq, req)) {
403 		return;
404 	}
405 	tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
406 }
407 
ctdb_attach_dbpath_done(struct tevent_req * subreq)408 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
409 {
410 	struct tevent_req *req = tevent_req_callback_data(
411 		subreq, struct tevent_req);
412 	struct ctdb_attach_state *state = tevent_req_data(
413 		req, struct ctdb_attach_state);
414 	struct ctdb_reply_control *reply;
415 	struct ctdb_req_control request;
416 	bool status;
417 	int ret;
418 
419 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
420 	TALLOC_FREE(subreq);
421 	if (! status) {
422 		DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
423 				  state->db->db_name, ret));
424 		tevent_req_error(req, ret);
425 		return;
426 	}
427 
428 	ret = ctdb_reply_control_getdbpath(reply, state->db,
429 					   &state->db->db_path);
430 	talloc_free(reply);
431 	if (ret != 0) {
432 		DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
433 				  state->db->db_name, ret));
434 		tevent_req_error(req, ret);
435 		return;
436 	}
437 
438 	ctdb_req_control_db_get_health(&request, state->db->db_id);
439 	subreq = ctdb_client_control_send(state, state->ev, state->client,
440 					  state->destnode, state->timeout,
441 					  &request);
442 	if (tevent_req_nomem(subreq, req)) {
443 		return;
444 	}
445 	tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
446 }
447 
ctdb_attach_health_done(struct tevent_req * subreq)448 static void ctdb_attach_health_done(struct tevent_req *subreq)
449 {
450 	struct tevent_req *req = tevent_req_callback_data(
451 		subreq, struct tevent_req);
452 	struct ctdb_attach_state *state = tevent_req_data(
453 		req, struct ctdb_attach_state);
454 	struct ctdb_reply_control *reply;
455 	const char *reason;
456 	bool status;
457 	int ret;
458 
459 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
460 	TALLOC_FREE(subreq);
461 	if (! status) {
462 		DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
463 				  state->db->db_name, ret));
464 		tevent_req_error(req, ret);
465 		return;
466 	}
467 
468 	ret = ctdb_reply_control_db_get_health(reply, state, &reason);
469 	if (ret != 0) {
470 		DEBUG(DEBUG_ERR,
471 		      ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
472 		       state->db->db_name, ret));
473 		tevent_req_error(req, ret);
474 		return;
475 	}
476 
477 	if (reason != NULL) {
478 		/* Database unhealthy, avoid attach */
479 		DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
480 				  state->db->db_name, reason));
481 		tevent_req_error(req, EIO);
482 		return;
483 	}
484 
485 	subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
486 					state->destnode, state->timeout,
487 					state->db->db_id, state->db_flags);
488 	if (tevent_req_nomem(subreq, req)) {
489 		return;
490 	}
491 	tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
492 }
493 
ctdb_attach_flags_done(struct tevent_req * subreq)494 static void ctdb_attach_flags_done(struct tevent_req *subreq)
495 {
496 	struct tevent_req *req = tevent_req_callback_data(
497 		subreq, struct tevent_req);
498 	struct ctdb_attach_state *state = tevent_req_data(
499 		req, struct ctdb_attach_state);
500 	struct ctdb_req_control request;
501 	bool status;
502 	int ret;
503 
504 	status = ctdb_set_db_flags_recv(subreq, &ret);
505 	TALLOC_FREE(subreq);
506 	if (! status) {
507 		DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
508 				  state->db->db_name, state->db_flags));
509 		tevent_req_error(req, ret);
510 		return;
511 	}
512 
513 	ctdb_req_control_db_open_flags(&request, state->db->db_id);
514 	subreq = ctdb_client_control_send(state, state->ev, state->client,
515 					  state->destnode, state->timeout,
516 					  &request);
517 	if (tevent_req_nomem(subreq, req)) {
518 		return;
519 	}
520 	tevent_req_set_callback(subreq, ctdb_attach_open_flags_done, req);
521 }
522 
ctdb_attach_open_flags_done(struct tevent_req * subreq)523 static void ctdb_attach_open_flags_done(struct tevent_req *subreq)
524 {
525 	struct tevent_req *req = tevent_req_callback_data(
526 		subreq, struct tevent_req);
527 	struct ctdb_attach_state *state = tevent_req_data(
528 		req, struct ctdb_attach_state);
529 	struct ctdb_reply_control *reply;
530 	bool status;
531 	int ret, tdb_flags;
532 
533 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
534 	TALLOC_FREE(subreq);
535 	if (! status) {
536 		DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS failed, ret=%d\n",
537 				  state->db->db_name, ret));
538 		tevent_req_error(req, ret);
539 		return;
540 	}
541 
542 	ret = ctdb_reply_control_db_open_flags(reply, &tdb_flags);
543 	talloc_free(reply);
544 	if (ret != 0) {
545 		DEBUG(DEBUG_ERR, ("attach: %s DB_OPEN_FLAGS parse failed,"
546 				  " ret=%d\n", state->db->db_name, ret));
547 		tevent_req_error(req, ret);
548 		return;
549 	}
550 
551 	state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
552 					tdb_flags, O_RDWR, 0);
553 	if (tevent_req_nomem(state->db->ltdb, req)) {
554 		DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
555 				  state->db->db_name));
556 		return;
557 	}
558 	DLIST_ADD(state->client->db, state->db);
559 
560 	tevent_req_done(req);
561 }
562 
ctdb_attach_recv(struct tevent_req * req,int * perr,struct ctdb_db_context ** out)563 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
564 		      struct ctdb_db_context **out)
565 {
566 	struct ctdb_attach_state *state = tevent_req_data(
567 		req, struct ctdb_attach_state);
568 	int err;
569 
570 	if (tevent_req_is_unix_error(req, &err)) {
571 		if (perr != NULL) {
572 			*perr = err;
573 		}
574 		return false;
575 	}
576 
577 	if (out != NULL) {
578 		*out = state->db;
579 	}
580 	return true;
581 }
582 
ctdb_attach(struct tevent_context * ev,struct ctdb_client_context * client,struct timeval timeout,const char * db_name,uint8_t db_flags,struct ctdb_db_context ** out)583 int ctdb_attach(struct tevent_context *ev,
584 		struct ctdb_client_context *client,
585 		struct timeval timeout,
586 		const char *db_name, uint8_t db_flags,
587 		struct ctdb_db_context **out)
588 {
589 	TALLOC_CTX *mem_ctx;
590 	struct tevent_req *req;
591 	bool status;
592 	int ret;
593 
594 	mem_ctx = talloc_new(client);
595 	if (mem_ctx == NULL) {
596 		return ENOMEM;
597 	}
598 
599 	req = ctdb_attach_send(mem_ctx, ev, client, timeout,
600 			       db_name, db_flags);
601 	if (req == NULL) {
602 		talloc_free(mem_ctx);
603 		return ENOMEM;
604 	}
605 
606 	tevent_req_poll(req, ev);
607 
608 	status = ctdb_attach_recv(req, &ret, out);
609 	if (! status) {
610 		talloc_free(mem_ctx);
611 		return ret;
612 	}
613 
614 	/*
615 	ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
616 	ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
617 	ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
618 	*/
619 
620 	talloc_free(mem_ctx);
621 	return 0;
622 }
623 
624 struct ctdb_detach_state {
625 	struct ctdb_client_context *client;
626 	struct tevent_context *ev;
627 	struct timeval timeout;
628 	uint32_t db_id;
629 	const char *db_name;
630 };
631 
632 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
633 static void ctdb_detach_done(struct tevent_req *subreq);
634 
ctdb_detach_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct timeval timeout,uint32_t db_id)635 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
636 				    struct tevent_context *ev,
637 				    struct ctdb_client_context *client,
638 				    struct timeval timeout, uint32_t db_id)
639 {
640 	struct tevent_req *req, *subreq;
641 	struct ctdb_detach_state *state;
642 	struct ctdb_req_control request;
643 
644 	req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
645 	if (req == NULL) {
646 		return NULL;
647 	}
648 
649 	state->client = client;
650 	state->ev = ev;
651 	state->timeout = timeout;
652 	state->db_id = db_id;
653 
654 	ctdb_req_control_get_dbname(&request, db_id);
655 	subreq = ctdb_client_control_send(state, ev, client,
656 					  ctdb_client_pnn(client), timeout,
657 					  &request);
658 	if (tevent_req_nomem(subreq, req)) {
659 		return tevent_req_post(req, ev);
660 	}
661 	tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
662 
663 	return req;
664 }
665 
ctdb_detach_dbname_done(struct tevent_req * subreq)666 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
667 {
668 	struct tevent_req *req = tevent_req_callback_data(
669 		subreq, struct tevent_req);
670 	struct ctdb_detach_state *state = tevent_req_data(
671 		req, struct ctdb_detach_state);
672 	struct ctdb_reply_control *reply;
673 	struct ctdb_req_control request;
674 	int ret;
675 	bool status;
676 
677 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
678 	TALLOC_FREE(subreq);
679 	if (! status) {
680 		DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
681 				  state->db_id, ret));
682 		tevent_req_error(req, ret);
683 		return;
684 	}
685 
686 	ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
687 	if (ret != 0) {
688 		DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
689 				  state->db_id, ret));
690 		tevent_req_error(req, ret);
691 		return;
692 	}
693 
694 	ctdb_req_control_db_detach(&request, state->db_id);
695 	subreq = ctdb_client_control_send(state, state->ev, state->client,
696 					  ctdb_client_pnn(state->client),
697 					  state->timeout, &request);
698 	if (tevent_req_nomem(subreq, req)) {
699 		return;
700 	}
701 	tevent_req_set_callback(subreq, ctdb_detach_done, req);
702 
703 }
704 
ctdb_detach_done(struct tevent_req * subreq)705 static void ctdb_detach_done(struct tevent_req *subreq)
706 {
707 	struct tevent_req *req = tevent_req_callback_data(
708 		subreq, struct tevent_req);
709 	struct ctdb_detach_state *state = tevent_req_data(
710 		req, struct ctdb_detach_state);
711 	struct ctdb_reply_control *reply;
712 	struct ctdb_db_context *db;
713 	int ret;
714 	bool status;
715 
716 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
717 	TALLOC_FREE(subreq);
718 	if (! status) {
719 		DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
720 				  state->db_name, ret));
721 		tevent_req_error(req, ret);
722 		return;
723 	}
724 
725 	ret = ctdb_reply_control_db_detach(reply);
726 	if (ret != 0) {
727 		DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
728 				  state->db_name, ret));
729 		tevent_req_error(req, ret);
730 		return;
731 	}
732 
733 	db = client_db_handle(state->client, state->db_name);
734 	if (db != NULL) {
735 		DLIST_REMOVE(state->client->db, db);
736 		TALLOC_FREE(db);
737 	}
738 
739 	tevent_req_done(req);
740 }
741 
ctdb_detach_recv(struct tevent_req * req,int * perr)742 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
743 {
744 	int ret;
745 
746 	if (tevent_req_is_unix_error(req, &ret)) {
747 		if (perr != NULL) {
748 			*perr = ret;
749 		}
750 		return false;
751 	}
752 
753 	return true;
754 }
755 
ctdb_detach(struct tevent_context * ev,struct ctdb_client_context * client,struct timeval timeout,uint32_t db_id)756 int ctdb_detach(struct tevent_context *ev,
757 		struct ctdb_client_context *client,
758 		struct timeval timeout, uint32_t db_id)
759 {
760 	TALLOC_CTX *mem_ctx;
761 	struct tevent_req *req;
762 	int ret;
763 	bool status;
764 
765 	mem_ctx = talloc_new(client);
766 	if (mem_ctx == NULL) {
767 		return ENOMEM;
768 	}
769 
770 	req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
771 	if (req == NULL) {
772 		talloc_free(mem_ctx);
773 		return ENOMEM;
774 	}
775 
776 	tevent_req_poll(req, ev);
777 
778 	status = ctdb_detach_recv(req, &ret);
779 	if (! status) {
780 		talloc_free(mem_ctx);
781 		return ret;
782 	}
783 
784 	talloc_free(mem_ctx);
785 	return 0;
786 }
787 
ctdb_db_id(struct ctdb_db_context * db)788 uint32_t ctdb_db_id(struct ctdb_db_context *db)
789 {
790 	return db->db_id;
791 }
792 
793 struct ctdb_db_traverse_local_state {
794 	ctdb_rec_parser_func_t parser;
795 	void *private_data;
796 	bool extract_header;
797 	int error;
798 };
799 
ctdb_db_traverse_local_handler(struct tdb_context * tdb,TDB_DATA key,TDB_DATA data,void * private_data)800 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
801 					  TDB_DATA key, TDB_DATA data,
802 					  void *private_data)
803 {
804 	struct ctdb_db_traverse_local_state *state =
805 		(struct ctdb_db_traverse_local_state *)private_data;
806 	int ret;
807 
808 	if (state->extract_header) {
809 		struct ctdb_ltdb_header header;
810 
811 		ret = ctdb_ltdb_header_extract(&data, &header);
812 		if (ret != 0) {
813 			state->error = ret;
814 			return 1;
815 		}
816 
817 		ret = state->parser(0, &header, key, data, state->private_data);
818 	} else {
819 		ret = state->parser(0, NULL, key, data, state->private_data);
820 	}
821 
822 	if (ret != 0) {
823 		state->error = ret;
824 		return 1;
825 	}
826 
827 	return 0;
828 }
829 
ctdb_db_traverse_local(struct ctdb_db_context * db,bool readonly,bool extract_header,ctdb_rec_parser_func_t parser,void * private_data)830 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
831 			   bool extract_header,
832 			   ctdb_rec_parser_func_t parser, void *private_data)
833 {
834 	struct ctdb_db_traverse_local_state state;
835 	int ret;
836 
837 	state.parser = parser;
838 	state.private_data = private_data;
839 	state.extract_header = extract_header;
840 	state.error = 0;
841 
842 	if (readonly) {
843 		ret = tdb_traverse_read(client_db_tdb(db),
844 					ctdb_db_traverse_local_handler,
845 					&state);
846 	} else {
847 		ret = tdb_traverse(client_db_tdb(db),
848 				   ctdb_db_traverse_local_handler, &state);
849 	}
850 
851 	if (ret == -1) {
852 		return EIO;
853 	}
854 
855 	return state.error;
856 }
857 
858 struct ctdb_db_traverse_state {
859 	struct tevent_context *ev;
860 	struct ctdb_client_context *client;
861 	struct ctdb_db_context *db;
862 	uint32_t destnode;
863 	uint64_t srvid;
864 	struct timeval timeout;
865 	ctdb_rec_parser_func_t parser;
866 	void *private_data;
867 	int result;
868 };
869 
870 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
871 static void ctdb_db_traverse_started(struct tevent_req *subreq);
872 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
873 				     void *private_data);
874 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
875 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
876 
ctdb_db_traverse_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * db,uint32_t destnode,struct timeval timeout,ctdb_rec_parser_func_t parser,void * private_data)877 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
878 					 struct tevent_context *ev,
879 					 struct ctdb_client_context *client,
880 					 struct ctdb_db_context *db,
881 					 uint32_t destnode,
882 					 struct timeval timeout,
883 					 ctdb_rec_parser_func_t parser,
884 					 void *private_data)
885 {
886 	struct tevent_req *req, *subreq;
887 	struct ctdb_db_traverse_state *state;
888 
889 	req = tevent_req_create(mem_ctx, &state,
890 				struct ctdb_db_traverse_state);
891 	if (req == NULL) {
892 		return NULL;
893 	}
894 
895 	state->ev = ev;
896 	state->client = client;
897 	state->db = db;
898 	state->destnode = destnode;
899 	state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
900 	state->timeout = timeout;
901 	state->parser = parser;
902 	state->private_data = private_data;
903 
904 	subreq = ctdb_client_set_message_handler_send(state, ev, client,
905 						      state->srvid,
906 						      ctdb_db_traverse_handler,
907 						      req);
908 	if (tevent_req_nomem(subreq, req)) {
909 		return tevent_req_post(req, ev);
910 	}
911 	tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
912 
913 	return req;
914 }
915 
ctdb_db_traverse_handler_set(struct tevent_req * subreq)916 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
917 {
918 	struct tevent_req *req = tevent_req_callback_data(
919 		subreq, struct tevent_req);
920 	struct ctdb_db_traverse_state *state = tevent_req_data(
921 		req, struct ctdb_db_traverse_state);
922 	struct ctdb_traverse_start_ext traverse;
923 	struct ctdb_req_control request;
924 	int ret = 0;
925 	bool status;
926 
927 	status = ctdb_client_set_message_handler_recv(subreq, &ret);
928 	TALLOC_FREE(subreq);
929 	if (! status) {
930 		tevent_req_error(req, ret);
931 		return;
932 	}
933 
934 	traverse = (struct ctdb_traverse_start_ext) {
935 		.db_id = ctdb_db_id(state->db),
936 		.reqid = 0,
937 		.srvid = state->srvid,
938 		.withemptyrecords = false,
939 	};
940 
941 	ctdb_req_control_traverse_start_ext(&request, &traverse);
942 	subreq = ctdb_client_control_send(state, state->ev, state->client,
943 					  state->destnode, state->timeout,
944 					  &request);
945 	if (subreq == NULL) {
946 		state->result = ENOMEM;
947 		ctdb_db_traverse_remove_handler(req);
948 		return;
949 	}
950 	tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
951 }
952 
ctdb_db_traverse_started(struct tevent_req * subreq)953 static void ctdb_db_traverse_started(struct tevent_req *subreq)
954 {
955 	struct tevent_req *req = tevent_req_callback_data(
956 		subreq, struct tevent_req);
957 	struct ctdb_db_traverse_state *state = tevent_req_data(
958 		req, struct ctdb_db_traverse_state);
959 	struct ctdb_reply_control *reply;
960 	int ret = 0;
961 	bool status;
962 
963 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
964 	TALLOC_FREE(subreq);
965 	if (! status) {
966 		DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
967 		state->result = ret;
968 		ctdb_db_traverse_remove_handler(req);
969 		return;
970 	}
971 
972 	ret = ctdb_reply_control_traverse_start_ext(reply);
973 	talloc_free(reply);
974 	if (ret != 0) {
975 		DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
976 				  ret));
977 		state->result = ret;
978 		ctdb_db_traverse_remove_handler(req);
979 		return;
980 	}
981 }
982 
ctdb_db_traverse_handler(uint64_t srvid,TDB_DATA data,void * private_data)983 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
984 				     void *private_data)
985 {
986 	struct tevent_req *req = talloc_get_type_abort(
987 		private_data, struct tevent_req);
988 	struct ctdb_db_traverse_state *state = tevent_req_data(
989 		req, struct ctdb_db_traverse_state);
990 	struct ctdb_rec_data *rec;
991 	struct ctdb_ltdb_header header;
992 	size_t np;
993 	int ret;
994 
995 	ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec, &np);
996 	if (ret != 0) {
997 		return;
998 	}
999 
1000 	if (rec->key.dsize == 0 && rec->data.dsize == 0) {
1001 		talloc_free(rec);
1002 		ctdb_db_traverse_remove_handler(req);
1003 		return;
1004 	}
1005 
1006 	ret = ctdb_ltdb_header_extract(&rec->data, &header);
1007 	if (ret != 0) {
1008 		talloc_free(rec);
1009 		return;
1010 	}
1011 
1012 	if (rec->data.dsize == 0) {
1013 		talloc_free(rec);
1014 		return;
1015 	}
1016 
1017 	ret = state->parser(rec->reqid, &header, rec->key, rec->data,
1018 			    state->private_data);
1019 	talloc_free(rec);
1020 	if (ret != 0) {
1021 		state->result = ret;
1022 		ctdb_db_traverse_remove_handler(req);
1023 	}
1024 }
1025 
ctdb_db_traverse_remove_handler(struct tevent_req * req)1026 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
1027 {
1028 	struct ctdb_db_traverse_state *state = tevent_req_data(
1029 		req, struct ctdb_db_traverse_state);
1030 	struct tevent_req *subreq;
1031 
1032 	subreq = ctdb_client_remove_message_handler_send(state, state->ev,
1033 							 state->client,
1034 							 state->srvid, req);
1035 	if (tevent_req_nomem(subreq, req)) {
1036 		return;
1037 	}
1038 	tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1039 }
1040 
ctdb_db_traverse_handler_removed(struct tevent_req * subreq)1041 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1042 {
1043 	struct tevent_req *req = tevent_req_callback_data(
1044 		subreq, struct tevent_req);
1045 	struct ctdb_db_traverse_state *state = tevent_req_data(
1046 		req, struct ctdb_db_traverse_state);
1047 	int ret;
1048 	bool status;
1049 
1050 	status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1051 	TALLOC_FREE(subreq);
1052 	if (! status) {
1053 		tevent_req_error(req, ret);
1054 		return;
1055 	}
1056 
1057 	if (state->result != 0) {
1058 		tevent_req_error(req, state->result);
1059 		return;
1060 	}
1061 
1062 	tevent_req_done(req);
1063 }
1064 
ctdb_db_traverse_recv(struct tevent_req * req,int * perr)1065 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1066 {
1067 	int ret;
1068 
1069 	if (tevent_req_is_unix_error(req, &ret)) {
1070 		if (perr != NULL) {
1071 			*perr = ret;
1072 		}
1073 		return false;
1074 	}
1075 
1076 	return true;
1077 }
1078 
ctdb_db_traverse(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * db,uint32_t destnode,struct timeval timeout,ctdb_rec_parser_func_t parser,void * private_data)1079 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1080 		     struct ctdb_client_context *client,
1081 		     struct ctdb_db_context *db,
1082 		     uint32_t destnode, struct timeval timeout,
1083 		     ctdb_rec_parser_func_t parser, void *private_data)
1084 {
1085 	struct tevent_req *req;
1086 	int ret = 0;
1087 	bool status;
1088 
1089 	req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1090 				    timeout, parser, private_data);
1091 	if (req == NULL) {
1092 		return ENOMEM;
1093 	}
1094 
1095 	tevent_req_poll(req, ev);
1096 
1097 	status = ctdb_db_traverse_recv(req, &ret);
1098 	if (! status) {
1099 		return ret;
1100 	}
1101 
1102 	return 0;
1103 }
1104 
ctdb_ltdb_fetch(struct ctdb_db_context * db,TDB_DATA key,struct ctdb_ltdb_header * header,TALLOC_CTX * mem_ctx,TDB_DATA * data)1105 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1106 		    struct ctdb_ltdb_header *header,
1107 		    TALLOC_CTX *mem_ctx, TDB_DATA *data)
1108 {
1109 	TDB_DATA rec;
1110 	size_t np;
1111 	int ret;
1112 
1113 	rec = tdb_fetch(client_db_tdb(db), key);
1114 	if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1115 		/* No record present */
1116 		if (rec.dptr != NULL) {
1117 			free(rec.dptr);
1118 		}
1119 
1120 		if (tdb_error(client_db_tdb(db)) != TDB_ERR_NOEXIST) {
1121 			return EIO;
1122 		}
1123 
1124 		*header = (struct ctdb_ltdb_header) {
1125 			.dmaster = CTDB_UNKNOWN_PNN,
1126 		};
1127 
1128 		if (data != NULL) {
1129 			*data = tdb_null;
1130 		}
1131 		return 0;
1132 	}
1133 
1134 	ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header, &np);
1135 	if (ret != 0) {
1136 		return ret;
1137 	}
1138 
1139 	ret = 0;
1140 	if (data != NULL) {
1141 		data->dsize = rec.dsize - np;
1142 		data->dptr = talloc_memdup(mem_ctx, rec.dptr + np,
1143 					   data->dsize);
1144 		if (data->dptr == NULL) {
1145 			ret = ENOMEM;
1146 		}
1147 	}
1148 
1149 	free(rec.dptr);
1150 	return ret;
1151 }
1152 
1153 /*
1154  * Fetch a record from volatile database
1155  *
1156  * Steps:
1157  *  1. Get a lock on the hash chain
1158  *  2. If the record does not exist, migrate the record
1159  *  3. If readonly=true and delegations do not exist, migrate the record.
1160  *  4. If readonly=false and delegations exist, migrate the record.
1161  *  5. If the local node is not dmaster, migrate the record.
1162  *  6. Return record
1163  */
1164 
1165 struct ctdb_fetch_lock_state {
1166 	struct tevent_context *ev;
1167 	struct ctdb_client_context *client;
1168 	struct ctdb_record_handle *h;
1169 	bool readonly;
1170 	uint32_t pnn;
1171 };
1172 
1173 static int ctdb_fetch_lock_check(struct tevent_req *req);
1174 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1175 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1176 
ctdb_fetch_lock_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * db,TDB_DATA key,bool readonly)1177 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1178 					struct tevent_context *ev,
1179 					struct ctdb_client_context *client,
1180 					struct ctdb_db_context *db,
1181 					TDB_DATA key, bool readonly)
1182 {
1183 	struct ctdb_fetch_lock_state *state;
1184 	struct tevent_req *req;
1185 	int ret;
1186 
1187 	req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1188 	if (req == NULL) {
1189 		return NULL;
1190 	}
1191 
1192 	state->ev = ev;
1193 	state->client = client;
1194 
1195 	state->h = talloc_zero(db, struct ctdb_record_handle);
1196 	if (tevent_req_nomem(state->h, req)) {
1197 		return tevent_req_post(req, ev);
1198 	}
1199 	state->h->ev = ev;
1200 	state->h->client = client;
1201 	state->h->db = db;
1202 	state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1203 	if (tevent_req_nomem(state->h->key.dptr, req)) {
1204 		return tevent_req_post(req, ev);
1205 	}
1206 	state->h->key.dsize = key.dsize;
1207 	state->h->readonly = false;
1208 
1209 	state->readonly = readonly;
1210 	state->pnn = ctdb_client_pnn(client);
1211 
1212 	/* Check that database is not persistent */
1213 	if (! ctdb_db_volatile(db)) {
1214 		DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1215 				  db->db_name));
1216 		tevent_req_error(req, EINVAL);
1217 		return tevent_req_post(req, ev);
1218 	}
1219 
1220 	ret = ctdb_fetch_lock_check(req);
1221 	if (ret == 0) {
1222 		tevent_req_done(req);
1223 		return tevent_req_post(req, ev);
1224 	}
1225 	if (ret != EAGAIN) {
1226 		tevent_req_error(req, ret);
1227 		return tevent_req_post(req, ev);
1228 	}
1229 	return req;
1230 }
1231 
ctdb_fetch_lock_check(struct tevent_req * req)1232 static int ctdb_fetch_lock_check(struct tevent_req *req)
1233 {
1234 	struct ctdb_fetch_lock_state *state = tevent_req_data(
1235 		req, struct ctdb_fetch_lock_state);
1236 	struct ctdb_record_handle *h = state->h;
1237 	struct ctdb_ltdb_header header;
1238 	TDB_DATA data = tdb_null;
1239 	size_t np;
1240 	int ret, err = 0;
1241 	bool do_migrate = false;
1242 
1243 	ret = tdb_chainlock(client_db_tdb(h->db), h->key);
1244 	if (ret != 0) {
1245 		DEBUG(DEBUG_ERR,
1246 		      ("fetch_lock: %s tdb_chainlock failed, %s\n",
1247 		       h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1248 		err = EIO;
1249 		goto failed;
1250 	}
1251 
1252 	data = tdb_fetch(client_db_tdb(h->db), h->key);
1253 	if (data.dptr == NULL) {
1254 		if (tdb_error(client_db_tdb(h->db)) == TDB_ERR_NOEXIST) {
1255 			goto migrate;
1256 		} else {
1257 			err = EIO;
1258 			goto failed;
1259 		}
1260 	}
1261 
1262 	/* Got the record */
1263 	ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header, &np);
1264 	if (ret != 0) {
1265 		err = ret;
1266 		goto failed;
1267 	}
1268 
1269 	if (! state->readonly) {
1270 		/* Read/write access */
1271 		if (header.dmaster == state->pnn &&
1272 		    header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1273 			goto migrate;
1274 		}
1275 
1276 		if (header.dmaster != state->pnn) {
1277 			goto migrate;
1278 		}
1279 	} else {
1280 		/* Readonly access */
1281 		if (header.dmaster != state->pnn &&
1282 		    ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1283 				       CTDB_REC_RO_HAVE_DELEGATIONS))) {
1284 			goto migrate;
1285 		}
1286 	}
1287 
1288 	/* We are the dmaster or readonly delegation */
1289 	h->header = header;
1290 	h->data = data;
1291 	if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1292 			    CTDB_REC_RO_HAVE_DELEGATIONS)) {
1293 		h->readonly = true;
1294 	}
1295 	return 0;
1296 
1297 migrate:
1298 	do_migrate = true;
1299 	err = EAGAIN;
1300 
1301 failed:
1302 	if (data.dptr != NULL) {
1303 		free(data.dptr);
1304 	}
1305 	ret = tdb_chainunlock(client_db_tdb(h->db), h->key);
1306 	if (ret != 0) {
1307 		DEBUG(DEBUG_ERR,
1308 		      ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1309 		       h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1310 		return EIO;
1311 	}
1312 
1313 	if (do_migrate) {
1314 		ctdb_fetch_lock_migrate(req);
1315 	}
1316 	return err;
1317 }
1318 
ctdb_fetch_lock_migrate(struct tevent_req * req)1319 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1320 {
1321 	struct ctdb_fetch_lock_state *state = tevent_req_data(
1322 		req, struct ctdb_fetch_lock_state);
1323 	struct ctdb_req_call request;
1324 	struct tevent_req *subreq;
1325 
1326 	ZERO_STRUCT(request);
1327 	request.flags = CTDB_IMMEDIATE_MIGRATION;
1328 	if (state->readonly) {
1329 		request.flags |= CTDB_WANT_READONLY;
1330 	}
1331 	request.db_id = state->h->db->db_id;
1332 	request.callid = CTDB_NULL_FUNC;
1333 	request.key = state->h->key;
1334 	request.calldata = tdb_null;
1335 
1336 	subreq = ctdb_client_call_send(state, state->ev, state->client,
1337 				       &request);
1338 	if (tevent_req_nomem(subreq, req)) {
1339 		return;
1340 	}
1341 
1342 	tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1343 }
1344 
ctdb_fetch_lock_migrate_done(struct tevent_req * subreq)1345 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1346 {
1347 	struct tevent_req *req = tevent_req_callback_data(
1348 		subreq, struct tevent_req);
1349 	struct ctdb_fetch_lock_state *state = tevent_req_data(
1350 		req, struct ctdb_fetch_lock_state);
1351 	struct ctdb_reply_call *reply;
1352 	int ret;
1353 	bool status;
1354 
1355 	status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1356 	TALLOC_FREE(subreq);
1357 	if (! status) {
1358 		DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1359 				  state->h->db->db_name, ret));
1360 		tevent_req_error(req, ret);
1361 		return;
1362 	}
1363 
1364 	if (reply->status != 0) {
1365 		tevent_req_error(req, EIO);
1366 		return;
1367 	}
1368 	talloc_free(reply);
1369 
1370 	ret = ctdb_fetch_lock_check(req);
1371 	if (ret != 0) {
1372 		if (ret != EAGAIN) {
1373 			tevent_req_error(req, ret);
1374 		}
1375 		return;
1376 	}
1377 
1378 	tevent_req_done(req);
1379 }
1380 
ctdb_record_handle_destructor(struct ctdb_record_handle * h)1381 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1382 {
1383 	int ret;
1384 
1385 	ret = tdb_chainunlock(client_db_tdb(h->db), h->key);
1386 	if (ret != 0) {
1387 		DEBUG(DEBUG_ERR,
1388 		      ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1389 		       h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1390 	}
1391 	free(h->data.dptr);
1392 	return 0;
1393 }
1394 
ctdb_fetch_lock_recv(struct tevent_req * req,struct ctdb_ltdb_header * header,TALLOC_CTX * mem_ctx,TDB_DATA * data,int * perr)1395 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1396 						struct ctdb_ltdb_header *header,
1397 						TALLOC_CTX *mem_ctx,
1398 						TDB_DATA *data, int *perr)
1399 {
1400 	struct ctdb_fetch_lock_state *state = tevent_req_data(
1401 		req, struct ctdb_fetch_lock_state);
1402 	struct ctdb_record_handle *h = state->h;
1403 	int err;
1404 
1405 	if (tevent_req_is_unix_error(req, &err)) {
1406 		if (perr != NULL) {
1407 			TALLOC_FREE(state->h);
1408 			*perr = err;
1409 		}
1410 		return NULL;
1411 	}
1412 
1413 	if (header != NULL) {
1414 		*header = h->header;
1415 	}
1416 	if (data != NULL) {
1417 		size_t offset;
1418 
1419 		offset = ctdb_ltdb_header_len(&h->header);
1420 
1421 		data->dsize = h->data.dsize - offset;
1422 		if (data->dsize == 0) {
1423 			data->dptr = NULL;
1424 		} else {
1425 			data->dptr = talloc_memdup(mem_ctx,
1426 						   h->data.dptr + offset,
1427 						   data->dsize);
1428 			if (data->dptr == NULL) {
1429 				TALLOC_FREE(state->h);
1430 				if (perr != NULL) {
1431 					*perr = ENOMEM;
1432 				}
1433 				return NULL;
1434 			}
1435 		}
1436 	}
1437 
1438 	talloc_set_destructor(h, ctdb_record_handle_destructor);
1439 	return h;
1440 }
1441 
ctdb_fetch_lock(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * db,TDB_DATA key,bool readonly,struct ctdb_record_handle ** out,struct ctdb_ltdb_header * header,TDB_DATA * data)1442 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1443 		    struct ctdb_client_context *client,
1444 		    struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1445 		    struct ctdb_record_handle **out,
1446 		    struct ctdb_ltdb_header *header, TDB_DATA *data)
1447 {
1448 	struct tevent_req *req;
1449 	struct ctdb_record_handle *h;
1450 	int ret = 0;
1451 
1452 	req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1453 	if (req == NULL) {
1454 		return ENOMEM;
1455 	}
1456 
1457 	tevent_req_poll(req, ev);
1458 
1459 	h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1460 	if (h == NULL) {
1461 		return ret;
1462 	}
1463 
1464 	*out = h;
1465 	return 0;
1466 }
1467 
ctdb_store_record(struct ctdb_record_handle * h,TDB_DATA data)1468 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1469 {
1470 	uint8_t header[sizeof(struct ctdb_ltdb_header)];
1471 	TDB_DATA rec[2];
1472 	size_t np;
1473 	int ret;
1474 
1475 	/* Cannot modify the record if it was obtained as a readonly copy */
1476 	if (h->readonly) {
1477 		return EINVAL;
1478 	}
1479 
1480 	/* Check if the new data is same */
1481 	if (h->data.dsize == data.dsize &&
1482 	    memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1483 		/* No need to do anything */
1484 		return 0;
1485 	}
1486 
1487 	ctdb_ltdb_header_push(&h->header, header, &np);
1488 
1489 	rec[0].dsize = np;
1490 	rec[0].dptr = header;
1491 
1492 	rec[1].dsize = data.dsize;
1493 	rec[1].dptr = data.dptr;
1494 
1495 	ret = tdb_storev(client_db_tdb(h->db), h->key, rec, 2, TDB_REPLACE);
1496 	if (ret != 0) {
1497 		DEBUG(DEBUG_ERR,
1498 		      ("store_record: %s tdb_storev failed, %s\n",
1499 		       h->db->db_name, tdb_errorstr(client_db_tdb(h->db))));
1500 		return EIO;
1501 	}
1502 
1503 	return 0;
1504 }
1505 
1506 struct ctdb_delete_record_state {
1507 	struct ctdb_record_handle *h;
1508 };
1509 
1510 static void ctdb_delete_record_done(struct tevent_req *subreq);
1511 
ctdb_delete_record_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_record_handle * h)1512 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1513 					   struct tevent_context *ev,
1514 					   struct ctdb_record_handle *h)
1515 {
1516 	struct tevent_req *req, *subreq;
1517 	struct ctdb_delete_record_state *state;
1518 	struct ctdb_key_data key;
1519 	struct ctdb_req_control request;
1520 	uint8_t header[sizeof(struct ctdb_ltdb_header)];
1521 	TDB_DATA rec;
1522 	size_t  np;
1523 	int ret;
1524 
1525 	req = tevent_req_create(mem_ctx, &state,
1526 				struct ctdb_delete_record_state);
1527 	if (req == NULL) {
1528 		return NULL;
1529 	}
1530 
1531 	state->h = h;
1532 
1533 	/* Cannot delete the record if it was obtained as a readonly copy */
1534 	if (h->readonly) {
1535 		DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1536 				  h->db->db_name));
1537 		tevent_req_error(req, EINVAL);
1538 		return tevent_req_post(req, ev);
1539 	}
1540 
1541 	ctdb_ltdb_header_push(&h->header, header, &np);
1542 
1543 	rec.dsize = np;
1544 	rec.dptr = header;
1545 
1546 	ret = tdb_store(client_db_tdb(h->db), h->key, rec, TDB_REPLACE);
1547 	if (ret != 0) {
1548 		D_ERR("fetch_lock delete: %s tdb_store failed, %s\n",
1549 		      h->db->db_name,
1550 		      tdb_errorstr(client_db_tdb(h->db)));
1551 		tevent_req_error(req, EIO);
1552 		return tevent_req_post(req, ev);
1553 	}
1554 
1555 	key.db_id = h->db->db_id;
1556 	key.header = h->header;
1557 	key.key = h->key;
1558 
1559 	ctdb_req_control_schedule_for_deletion(&request, &key);
1560 	subreq = ctdb_client_control_send(state, ev, h->client,
1561 					  ctdb_client_pnn(h->client),
1562 					  tevent_timeval_zero(),
1563 					  &request);
1564 	if (tevent_req_nomem(subreq, req)) {
1565 		return tevent_req_post(req, ev);
1566 	}
1567 	tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1568 
1569 	return req;
1570 }
1571 
ctdb_delete_record_done(struct tevent_req * subreq)1572 static void ctdb_delete_record_done(struct tevent_req *subreq)
1573 {
1574 	struct tevent_req *req = tevent_req_callback_data(
1575 		subreq, struct tevent_req);
1576 	struct ctdb_delete_record_state *state = tevent_req_data(
1577 		req, struct ctdb_delete_record_state);
1578 	int ret;
1579 	bool status;
1580 
1581 	status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1582 	TALLOC_FREE(subreq);
1583 	if (! status) {
1584 		D_ERR("delete_record: %s SCHEDULE_FOR_DELETION failed, ret=%d\n",
1585 		      state->h->db->db_name,
1586 		      ret);
1587 		tevent_req_error(req, ret);
1588 		return;
1589 	}
1590 
1591 	tevent_req_done(req);
1592 }
1593 
ctdb_delete_record_recv(struct tevent_req * req,int * perr)1594 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1595 {
1596 	int err;
1597 
1598 	if (tevent_req_is_unix_error(req, &err)) {
1599 		if (perr != NULL) {
1600 			*perr = err;
1601 		}
1602 		return false;
1603 	}
1604 
1605 	return true;
1606 }
1607 
1608 
ctdb_delete_record(struct ctdb_record_handle * h)1609 int ctdb_delete_record(struct ctdb_record_handle *h)
1610 {
1611 	struct tevent_context *ev = h->ev;
1612 	TALLOC_CTX *mem_ctx;
1613 	struct tevent_req *req;
1614 	int ret;
1615 	bool status;
1616 
1617 	mem_ctx = talloc_new(NULL);
1618 	if (mem_ctx == NULL) {
1619 		return ENOMEM;
1620 	}
1621 
1622 	req = ctdb_delete_record_send(mem_ctx, ev, h);
1623 	if (req == NULL) {
1624 		talloc_free(mem_ctx);
1625 		return ENOMEM;
1626 	}
1627 
1628 	tevent_req_poll(req, ev);
1629 
1630 	status = ctdb_delete_record_recv(req, &ret);
1631 	talloc_free(mem_ctx);
1632 	if (! status) {
1633 		return ret;
1634 	}
1635 
1636 	return 0;
1637 }
1638 
1639 /*
1640  * Global lock functions
1641  */
1642 
1643 struct ctdb_g_lock_lock_state {
1644 	struct tevent_context *ev;
1645 	struct ctdb_client_context *client;
1646 	struct ctdb_db_context *db;
1647 	TDB_DATA key;
1648 	struct ctdb_server_id my_sid;
1649 	enum ctdb_g_lock_type lock_type;
1650 	struct ctdb_record_handle *h;
1651 	/* state for verification of active locks */
1652 	struct ctdb_g_lock_list *lock_list;
1653 	unsigned int current;
1654 };
1655 
1656 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1657 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1658 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1659 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1660 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1661 
ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,enum ctdb_g_lock_type l2)1662 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1663 				  enum ctdb_g_lock_type l2)
1664 {
1665 	if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1666 		return false;
1667 	}
1668 	return true;
1669 }
1670 
ctdb_g_lock_lock_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * db,const char * keyname,struct ctdb_server_id * sid,bool readonly)1671 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1672 					 struct tevent_context *ev,
1673 					 struct ctdb_client_context *client,
1674 					 struct ctdb_db_context *db,
1675 					 const char *keyname,
1676 					 struct ctdb_server_id *sid,
1677 					 bool readonly)
1678 {
1679 	struct tevent_req *req, *subreq;
1680 	struct ctdb_g_lock_lock_state *state;
1681 
1682 	req = tevent_req_create(mem_ctx, &state,
1683 				struct ctdb_g_lock_lock_state);
1684 	if (req == NULL) {
1685 		return NULL;
1686 	}
1687 
1688 	state->ev = ev;
1689 	state->client = client;
1690 	state->db = db;
1691 	state->key.dptr = discard_const(keyname);
1692 	state->key.dsize = strlen(keyname) + 1;
1693 	state->my_sid = *sid;
1694 	state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1695 
1696 	subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1697 				      false);
1698 	if (tevent_req_nomem(subreq, req)) {
1699 		return tevent_req_post(req, ev);
1700 	}
1701 	tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1702 
1703 	return req;
1704 }
1705 
ctdb_g_lock_lock_fetched(struct tevent_req * subreq)1706 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1707 {
1708 	struct tevent_req *req = tevent_req_callback_data(
1709 		subreq, struct tevent_req);
1710 	struct ctdb_g_lock_lock_state *state = tevent_req_data(
1711 		req, struct ctdb_g_lock_lock_state);
1712 	TDB_DATA data;
1713 	size_t np;
1714 	int ret = 0;
1715 
1716 	state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1717 	TALLOC_FREE(subreq);
1718 	if (state->h == NULL) {
1719 		DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1720 				  (char *)state->key.dptr));
1721 		tevent_req_error(req, ret);
1722 		return;
1723 	}
1724 
1725 	if (state->lock_list != NULL) {
1726 		TALLOC_FREE(state->lock_list);
1727 		state->current = 0;
1728 	}
1729 
1730 	ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1731 				    &state->lock_list, &np);
1732 	talloc_free(data.dptr);
1733 	if (ret != 0) {
1734 		DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1735 				  (char *)state->key.dptr));
1736 		tevent_req_error(req, ret);
1737 		return;
1738 	}
1739 
1740 	ctdb_g_lock_lock_process_locks(req);
1741 }
1742 
ctdb_g_lock_lock_process_locks(struct tevent_req * req)1743 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1744 {
1745 	struct ctdb_g_lock_lock_state *state = tevent_req_data(
1746 		req, struct ctdb_g_lock_lock_state);
1747 	struct tevent_req *subreq;
1748 	struct ctdb_g_lock *lock;
1749 	bool check_server = false;
1750 	int ret;
1751 
1752 	while (state->current < state->lock_list->num) {
1753 		lock = &state->lock_list->lock[state->current];
1754 
1755 		/* We should not ask for the same lock more than once */
1756 		if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1757 			DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1758 					  (char *)state->key.dptr));
1759 			tevent_req_error(req, EDEADLK);
1760 			return;
1761 		}
1762 
1763 		if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1764 			check_server = true;
1765 			break;
1766 		}
1767 
1768 		state->current += 1;
1769 	}
1770 
1771 	if (check_server) {
1772 		struct ctdb_req_control request;
1773 
1774 		ctdb_req_control_process_exists(&request, lock->sid.pid);
1775 		subreq = ctdb_client_control_send(state, state->ev,
1776 						  state->client,
1777 						  lock->sid.vnn,
1778 						  tevent_timeval_zero(),
1779 						  &request);
1780 		if (tevent_req_nomem(subreq, req)) {
1781 			return;
1782 		}
1783 		tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1784 		return;
1785 	}
1786 
1787 	/* There is no conflict, add ourself to the lock_list */
1788 	state->lock_list->lock = talloc_realloc(state->lock_list,
1789 						state->lock_list->lock,
1790 						struct ctdb_g_lock,
1791 						state->lock_list->num + 1);
1792 	if (state->lock_list->lock == NULL) {
1793 		tevent_req_error(req, ENOMEM);
1794 		return;
1795 	}
1796 
1797 	lock = &state->lock_list->lock[state->lock_list->num];
1798 	lock->type = state->lock_type;
1799 	lock->sid = state->my_sid;
1800 	state->lock_list->num += 1;
1801 
1802 	ret = ctdb_g_lock_lock_update(req);
1803 	if (ret != 0) {
1804 		tevent_req_error(req, ret);
1805 		return;
1806 	}
1807 
1808 	TALLOC_FREE(state->h);
1809 	tevent_req_done(req);
1810 }
1811 
ctdb_g_lock_lock_checked(struct tevent_req * subreq)1812 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1813 {
1814 	struct tevent_req *req = tevent_req_callback_data(
1815 		subreq, struct tevent_req);
1816 	struct ctdb_g_lock_lock_state *state = tevent_req_data(
1817 		req, struct ctdb_g_lock_lock_state);
1818 	struct ctdb_reply_control *reply;
1819 	int ret, value;
1820 	bool status;
1821 
1822 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1823 	TALLOC_FREE(subreq);
1824 	if (! status) {
1825 		DEBUG(DEBUG_ERR,
1826 		      ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1827 		       (char *)state->key.dptr, ret));
1828 		tevent_req_error(req, ret);
1829 		return;
1830 	}
1831 
1832 	ret = ctdb_reply_control_process_exists(reply, &value);
1833 	if (ret != 0) {
1834 		tevent_req_error(req, ret);
1835 		return;
1836 	}
1837 	talloc_free(reply);
1838 
1839 	if (value == 0) {
1840 		/* server process exists, need to retry */
1841 		TALLOC_FREE(state->h);
1842 		subreq = tevent_wakeup_send(state, state->ev,
1843 					    tevent_timeval_current_ofs(0,1000));
1844 		if (tevent_req_nomem(subreq, req)) {
1845 			return;
1846 		}
1847 		tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1848 		return;
1849 	}
1850 
1851 	/* server process does not exist, remove conflicting entry */
1852 	state->lock_list->lock[state->current] =
1853 		state->lock_list->lock[state->lock_list->num-1];
1854 	state->lock_list->num -= 1;
1855 
1856 	ret = ctdb_g_lock_lock_update(req);
1857 	if (ret != 0) {
1858 		tevent_req_error(req, ret);
1859 		return;
1860 	}
1861 
1862 	ctdb_g_lock_lock_process_locks(req);
1863 }
1864 
ctdb_g_lock_lock_update(struct tevent_req * req)1865 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1866 {
1867 	struct ctdb_g_lock_lock_state *state = tevent_req_data(
1868 		req, struct ctdb_g_lock_lock_state);
1869 	TDB_DATA data;
1870 	size_t np;
1871 	int ret;
1872 
1873 	data.dsize = ctdb_g_lock_list_len(state->lock_list);
1874 	data.dptr = talloc_size(state, data.dsize);
1875 	if (data.dptr == NULL) {
1876 		return ENOMEM;
1877 	}
1878 
1879 	ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
1880 	ret = ctdb_store_record(state->h, data);
1881 	talloc_free(data.dptr);
1882 	return ret;
1883 }
1884 
ctdb_g_lock_lock_retry(struct tevent_req * subreq)1885 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1886 {
1887 	struct tevent_req *req = tevent_req_callback_data(
1888 		subreq, struct tevent_req);
1889 	struct ctdb_g_lock_lock_state *state = tevent_req_data(
1890 		req, struct ctdb_g_lock_lock_state);
1891 	bool success;
1892 
1893 	success = tevent_wakeup_recv(subreq);
1894 	TALLOC_FREE(subreq);
1895 	if (! success) {
1896 		tevent_req_error(req, ENOMEM);
1897 		return;
1898 	}
1899 
1900 	subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1901 				      state->db, state->key, false);
1902 	if (tevent_req_nomem(subreq, req)) {
1903 		return;
1904 	}
1905 	tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1906 }
1907 
ctdb_g_lock_lock_recv(struct tevent_req * req,int * perr)1908 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1909 {
1910 	struct ctdb_g_lock_lock_state *state = tevent_req_data(
1911 		req, struct ctdb_g_lock_lock_state);
1912 	int err;
1913 
1914 	TALLOC_FREE(state->h);
1915 
1916 	if (tevent_req_is_unix_error(req, &err)) {
1917 		if (perr != NULL) {
1918 			*perr = err;
1919 		}
1920 		return false;
1921 	}
1922 
1923 	return true;
1924 }
1925 
1926 struct ctdb_g_lock_unlock_state {
1927 	struct tevent_context *ev;
1928 	struct ctdb_client_context *client;
1929 	struct ctdb_db_context *db;
1930 	TDB_DATA key;
1931 	struct ctdb_server_id my_sid;
1932 	struct ctdb_record_handle *h;
1933 	struct ctdb_g_lock_list *lock_list;
1934 };
1935 
1936 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1937 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1938 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1939 
ctdb_g_lock_unlock_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct ctdb_db_context * db,const char * keyname,struct ctdb_server_id sid)1940 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1941 					   struct tevent_context *ev,
1942 					   struct ctdb_client_context *client,
1943 					   struct ctdb_db_context *db,
1944 					   const char *keyname,
1945 					   struct ctdb_server_id sid)
1946 {
1947 	struct tevent_req *req, *subreq;
1948 	struct ctdb_g_lock_unlock_state *state;
1949 
1950 	req = tevent_req_create(mem_ctx, &state,
1951 				struct ctdb_g_lock_unlock_state);
1952 	if (req == NULL) {
1953 		return NULL;
1954 	}
1955 
1956 	state->ev = ev;
1957 	state->client = client;
1958 	state->db = db;
1959 	state->key.dptr = discard_const(keyname);
1960 	state->key.dsize = strlen(keyname) + 1;
1961 	state->my_sid = sid;
1962 
1963 	subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1964 				      false);
1965 	if (tevent_req_nomem(subreq, req)) {
1966 		return tevent_req_post(req, ev);
1967 	}
1968 	tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1969 
1970 	return req;
1971 }
1972 
ctdb_g_lock_unlock_fetched(struct tevent_req * subreq)1973 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1974 {
1975 	struct tevent_req *req = tevent_req_callback_data(
1976 		subreq, struct tevent_req);
1977 	struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1978 		req, struct ctdb_g_lock_unlock_state);
1979 	TDB_DATA data;
1980 	size_t np;
1981 	int ret = 0;
1982 
1983 	state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1984 	TALLOC_FREE(subreq);
1985 	if (state->h == NULL) {
1986 		DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1987 				  (char *)state->key.dptr));
1988 		tevent_req_error(req, ret);
1989 		return;
1990 	}
1991 
1992 	ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1993 				    &state->lock_list, &np);
1994 	if (ret != 0) {
1995 		DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1996 				  (char *)state->key.dptr));
1997 		tevent_req_error(req, ret);
1998 		return;
1999 	}
2000 
2001 	ret = ctdb_g_lock_unlock_update(req);
2002 	if (ret != 0) {
2003 		tevent_req_error(req, ret);
2004 		return;
2005 	}
2006 
2007 	if (state->lock_list->num == 0) {
2008 		subreq = ctdb_delete_record_send(state, state->ev, state->h);
2009 		if (tevent_req_nomem(subreq, req)) {
2010 			return;
2011 		}
2012 		tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
2013 					req);
2014 		return;
2015 	}
2016 
2017 	TALLOC_FREE(state->h);
2018 	tevent_req_done(req);
2019 }
2020 
ctdb_g_lock_unlock_update(struct tevent_req * req)2021 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
2022 {
2023 	struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2024 		req, struct ctdb_g_lock_unlock_state);
2025 	struct ctdb_g_lock *lock;
2026 	unsigned int i;
2027 	int ret;
2028 
2029 	for (i=0; i<state->lock_list->num; i++) {
2030 		lock = &state->lock_list->lock[i];
2031 
2032 		if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
2033 			break;
2034 		}
2035 	}
2036 
2037 	if (i < state->lock_list->num) {
2038 		state->lock_list->lock[i] =
2039 			state->lock_list->lock[state->lock_list->num-1];
2040 		state->lock_list->num -= 1;
2041 	}
2042 
2043 	if (state->lock_list->num != 0) {
2044 		TDB_DATA data;
2045 		size_t np;
2046 
2047 		data.dsize = ctdb_g_lock_list_len(state->lock_list);
2048 		data.dptr = talloc_size(state, data.dsize);
2049 		if (data.dptr == NULL) {
2050 			return ENOMEM;
2051 		}
2052 
2053 		ctdb_g_lock_list_push(state->lock_list, data.dptr, &np);
2054 		ret = ctdb_store_record(state->h, data);
2055 		talloc_free(data.dptr);
2056 		if (ret != 0) {
2057 			return ret;
2058 		}
2059 	}
2060 
2061 	return 0;
2062 }
2063 
ctdb_g_lock_unlock_deleted(struct tevent_req * subreq)2064 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2065 {
2066 	struct tevent_req *req = tevent_req_callback_data(
2067 		subreq, struct tevent_req);
2068 	struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2069 		req, struct ctdb_g_lock_unlock_state);
2070 	int ret;
2071 	bool status;
2072 
2073 	status = ctdb_delete_record_recv(subreq, &ret);
2074 	if (! status) {
2075 		DEBUG(DEBUG_ERR,
2076 		      ("g_lock_unlock %s delete record failed, ret=%d\n",
2077 		       (char *)state->key.dptr, ret));
2078 		tevent_req_error(req, ret);
2079 		return;
2080 	}
2081 
2082 	TALLOC_FREE(state->h);
2083 	tevent_req_done(req);
2084 }
2085 
ctdb_g_lock_unlock_recv(struct tevent_req * req,int * perr)2086 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2087 {
2088 	struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2089 		req, struct ctdb_g_lock_unlock_state);
2090 	int err;
2091 
2092 	TALLOC_FREE(state->h);
2093 
2094 	if (tevent_req_is_unix_error(req, &err)) {
2095 		if (perr != NULL) {
2096 			*perr = err;
2097 		}
2098 		return false;
2099 	}
2100 
2101 	return true;
2102 }
2103 
2104 /*
2105  * Persistent database functions
2106  */
2107 struct ctdb_transaction_start_state {
2108 	struct tevent_context *ev;
2109 	struct ctdb_client_context *client;
2110 	struct timeval timeout;
2111 	struct ctdb_transaction_handle *h;
2112 	uint32_t destnode;
2113 };
2114 
2115 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2116 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2117 
ctdb_transaction_start_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct timeval timeout,struct ctdb_db_context * db,bool readonly)2118 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2119 					       struct tevent_context *ev,
2120 					       struct ctdb_client_context *client,
2121 					       struct timeval timeout,
2122 					       struct ctdb_db_context *db,
2123 					       bool readonly)
2124 {
2125 	struct ctdb_transaction_start_state *state;
2126 	struct tevent_req *req, *subreq;
2127 	struct ctdb_transaction_handle *h;
2128 
2129 	req = tevent_req_create(mem_ctx, &state,
2130 				struct ctdb_transaction_start_state);
2131 	if (req == NULL) {
2132 		return NULL;
2133 	}
2134 
2135 	if (ctdb_db_volatile(db)) {
2136 		tevent_req_error(req, EINVAL);
2137 		return tevent_req_post(req, ev);
2138 	}
2139 
2140 	state->ev = ev;
2141 	state->client = client;
2142 	state->destnode = ctdb_client_pnn(client);
2143 
2144 	h = talloc_zero(db, struct ctdb_transaction_handle);
2145 	if (tevent_req_nomem(h, req)) {
2146 		return tevent_req_post(req, ev);
2147 	}
2148 
2149 	h->ev = ev;
2150 	h->client = client;
2151 	h->db = db;
2152 	h->readonly = readonly;
2153 	h->updated = false;
2154 
2155 	/* SRVID is unique for databases, so client can have transactions
2156 	 * active for multiple databases */
2157 	h->sid = ctdb_client_get_server_id(client, db->db_id);
2158 
2159 	h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2160 	if (tevent_req_nomem(h->recbuf, req)) {
2161 		return tevent_req_post(req, ev);
2162 	}
2163 
2164 	h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2165 	if (tevent_req_nomem(h->lock_name, req)) {
2166 		return tevent_req_post(req, ev);
2167 	}
2168 
2169 	state->h = h;
2170 
2171 	subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2172 	if (tevent_req_nomem(subreq, req)) {
2173 		return tevent_req_post(req, ev);
2174 	}
2175 	tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2176 
2177 	return req;
2178 }
2179 
ctdb_transaction_g_lock_attached(struct tevent_req * subreq)2180 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2181 {
2182 	struct tevent_req *req = tevent_req_callback_data(
2183 		subreq, struct tevent_req);
2184 	struct ctdb_transaction_start_state *state = tevent_req_data(
2185 		req, struct ctdb_transaction_start_state);
2186 	bool status;
2187 	int ret;
2188 
2189 	status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2190 	TALLOC_FREE(subreq);
2191 	if (! status) {
2192 		DEBUG(DEBUG_ERR,
2193 		      ("transaction_start: %s attach g_lock.tdb failed\n",
2194 		       state->h->db->db_name));
2195 		tevent_req_error(req, ret);
2196 		return;
2197 	}
2198 
2199 	subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2200 				       state->h->db_g_lock,
2201 				       state->h->lock_name,
2202 				       &state->h->sid, state->h->readonly);
2203 	if (tevent_req_nomem(subreq, req)) {
2204 		return;
2205 	}
2206 	tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2207 }
2208 
ctdb_transaction_g_lock_done(struct tevent_req * subreq)2209 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2210 {
2211 	struct tevent_req *req = tevent_req_callback_data(
2212 		subreq, struct tevent_req);
2213 	struct ctdb_transaction_start_state *state = tevent_req_data(
2214 		req, struct ctdb_transaction_start_state);
2215 	int ret;
2216 	bool status;
2217 
2218 	status = ctdb_g_lock_lock_recv(subreq, &ret);
2219 	TALLOC_FREE(subreq);
2220 	if (! status) {
2221 		DEBUG(DEBUG_ERR,
2222 		      ("transaction_start: %s g_lock lock failed, ret=%d\n",
2223 		       state->h->db->db_name, ret));
2224 		tevent_req_error(req, ret);
2225 		return;
2226 	}
2227 
2228 	tevent_req_done(req);
2229 }
2230 
ctdb_transaction_start_recv(struct tevent_req * req,int * perr)2231 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2232 					struct tevent_req *req,
2233 					int *perr)
2234 {
2235 	struct ctdb_transaction_start_state *state = tevent_req_data(
2236 		req, struct ctdb_transaction_start_state);
2237 	int err;
2238 
2239 	if (tevent_req_is_unix_error(req, &err)) {
2240 		if (perr != NULL) {
2241 			*perr = err;
2242 		}
2243 		return NULL;
2244 	}
2245 
2246 	return state->h;
2247 }
2248 
ctdb_transaction_start(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct ctdb_client_context * client,struct timeval timeout,struct ctdb_db_context * db,bool readonly,struct ctdb_transaction_handle ** out)2249 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2250 			   struct ctdb_client_context *client,
2251 			   struct timeval timeout,
2252 			   struct ctdb_db_context *db, bool readonly,
2253 			   struct ctdb_transaction_handle **out)
2254 {
2255 	struct tevent_req *req;
2256 	struct ctdb_transaction_handle *h;
2257 	int ret = 0;
2258 
2259 	req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2260 					  readonly);
2261 	if (req == NULL) {
2262 		return ENOMEM;
2263 	}
2264 
2265 	tevent_req_poll(req, ev);
2266 
2267 	h = ctdb_transaction_start_recv(req, &ret);
2268 	if (h == NULL) {
2269 		return ret;
2270 	}
2271 
2272 	*out = h;
2273 	return 0;
2274 }
2275 
2276 struct ctdb_transaction_record_fetch_state {
2277 	TDB_DATA key, data;
2278 	struct ctdb_ltdb_header header;
2279 	bool found;
2280 };
2281 
ctdb_transaction_record_fetch_traverse(uint32_t reqid,struct ctdb_ltdb_header * nullheader,TDB_DATA key,TDB_DATA data,void * private_data)2282 static int ctdb_transaction_record_fetch_traverse(
2283 				uint32_t reqid,
2284 				struct ctdb_ltdb_header *nullheader,
2285 				TDB_DATA key, TDB_DATA data,
2286 				void *private_data)
2287 {
2288 	struct ctdb_transaction_record_fetch_state *state =
2289 		(struct ctdb_transaction_record_fetch_state *)private_data;
2290 
2291 	if (state->key.dsize == key.dsize &&
2292 	    memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2293 		int ret;
2294 
2295 		ret = ctdb_ltdb_header_extract(&data, &state->header);
2296 		if (ret != 0) {
2297 			DEBUG(DEBUG_ERR,
2298 			      ("record_fetch: Failed to extract header, "
2299 			       "ret=%d\n", ret));
2300 			return 1;
2301 		}
2302 
2303 		state->data = data;
2304 		state->found = true;
2305 	}
2306 
2307 	return 0;
2308 }
2309 
ctdb_transaction_record_fetch(struct ctdb_transaction_handle * h,TDB_DATA key,struct ctdb_ltdb_header * header,TDB_DATA * data)2310 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2311 					 TDB_DATA key,
2312 					 struct ctdb_ltdb_header *header,
2313 					 TDB_DATA *data)
2314 {
2315 	struct ctdb_transaction_record_fetch_state state;
2316 	int ret;
2317 
2318 	state.key = key;
2319 	state.found = false;
2320 
2321 	ret = ctdb_rec_buffer_traverse(h->recbuf,
2322 				       ctdb_transaction_record_fetch_traverse,
2323 				       &state);
2324 	if (ret != 0) {
2325 		return ret;
2326 	}
2327 
2328 	if (state.found) {
2329 		if (header != NULL) {
2330 			*header = state.header;
2331 		}
2332 		if (data != NULL) {
2333 			*data = state.data;
2334 		}
2335 		return 0;
2336 	}
2337 
2338 	return ENOENT;
2339 }
2340 
ctdb_transaction_fetch_record(struct ctdb_transaction_handle * h,TDB_DATA key,TALLOC_CTX * mem_ctx,TDB_DATA * data)2341 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2342 				  TDB_DATA key,
2343 				  TALLOC_CTX *mem_ctx, TDB_DATA *data)
2344 {
2345 	TDB_DATA tmp_data;
2346 	struct ctdb_ltdb_header header;
2347 	int ret;
2348 
2349 	ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2350 	if (ret == 0) {
2351 		data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2352 					   tmp_data.dsize);
2353 		if (data->dptr == NULL) {
2354 			return ENOMEM;
2355 		}
2356 		data->dsize = tmp_data.dsize;
2357 		return 0;
2358 	}
2359 
2360 	ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2361 	if (ret != 0) {
2362 		return ret;
2363 	}
2364 
2365 	ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2366 	if (ret != 0) {
2367 		return ret;
2368 	}
2369 
2370 	return 0;
2371 }
2372 
ctdb_transaction_store_record(struct ctdb_transaction_handle * h,TDB_DATA key,TDB_DATA data)2373 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2374 				  TDB_DATA key, TDB_DATA data)
2375 {
2376 	TALLOC_CTX *tmp_ctx;
2377 	struct ctdb_ltdb_header header;
2378 	TDB_DATA old_data;
2379 	int ret;
2380 
2381 	if (h->readonly) {
2382 		return EINVAL;
2383 	}
2384 
2385 	tmp_ctx = talloc_new(h);
2386 	if (tmp_ctx == NULL) {
2387 		return ENOMEM;
2388 	}
2389 
2390 	ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2391 	if (ret != 0) {
2392 		ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2393 		if (ret != 0) {
2394 			return ret;
2395 		}
2396 	}
2397 
2398 	if (old_data.dsize == data.dsize &&
2399 	    memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2400 		talloc_free(tmp_ctx);
2401 		return 0;
2402 	}
2403 
2404 	header.dmaster = ctdb_client_pnn(h->client);
2405 	header.rsn += 1;
2406 
2407 	ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2408 	talloc_free(tmp_ctx);
2409 	if (ret != 0) {
2410 		return ret;
2411 	}
2412 	h->updated = true;
2413 
2414 	return 0;
2415 }
2416 
ctdb_transaction_delete_record(struct ctdb_transaction_handle * h,TDB_DATA key)2417 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2418 				   TDB_DATA key)
2419 {
2420 	return ctdb_transaction_store_record(h, key, tdb_null);
2421 }
2422 
ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle * h,uint64_t * seqnum)2423 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2424 					    uint64_t *seqnum)
2425 {
2426 	const char *keyname = CTDB_DB_SEQNUM_KEY;
2427 	TDB_DATA key, data;
2428 	struct ctdb_ltdb_header header;
2429 	int ret;
2430 
2431 	key.dptr = discard_const(keyname);
2432 	key.dsize = strlen(keyname) + 1;
2433 
2434 	ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2435 	if (ret != 0) {
2436 		DEBUG(DEBUG_ERR,
2437 		      ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2438 		       h->db->db_name, ret));
2439 		return ret;
2440 	}
2441 
2442 	if (data.dsize == 0) {
2443 		/* initial data */
2444 		*seqnum = 0;
2445 		return 0;
2446 	}
2447 
2448 	if (data.dsize != sizeof(uint64_t)) {
2449 		talloc_free(data.dptr);
2450 		return EINVAL;
2451 	}
2452 
2453 	*seqnum = *(uint64_t *)data.dptr;
2454 
2455 	talloc_free(data.dptr);
2456 	return 0;
2457 }
2458 
ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle * h,uint64_t seqnum)2459 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2460 					    uint64_t seqnum)
2461 {
2462 	const char *keyname = CTDB_DB_SEQNUM_KEY;
2463 	TDB_DATA key, data;
2464 
2465 	key.dptr = discard_const(keyname);
2466 	key.dsize = strlen(keyname) + 1;
2467 
2468 	data.dptr = (uint8_t *)&seqnum;
2469 	data.dsize = sizeof(seqnum);
2470 
2471 	return ctdb_transaction_store_record(h, key, data);
2472 }
2473 
2474 struct ctdb_transaction_commit_state {
2475 	struct tevent_context *ev;
2476 	struct timeval timeout;
2477 	struct ctdb_transaction_handle *h;
2478 	uint64_t seqnum;
2479 };
2480 
2481 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2482 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2483 
ctdb_transaction_commit_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct timeval timeout,struct ctdb_transaction_handle * h)2484 struct tevent_req *ctdb_transaction_commit_send(
2485 					TALLOC_CTX *mem_ctx,
2486 					struct tevent_context *ev,
2487 					struct timeval timeout,
2488 					struct ctdb_transaction_handle *h)
2489 {
2490 	struct tevent_req *req, *subreq;
2491 	struct ctdb_transaction_commit_state *state;
2492 	struct ctdb_req_control request;
2493 	int ret;
2494 
2495 	req = tevent_req_create(mem_ctx, &state,
2496 				struct ctdb_transaction_commit_state);
2497 	if (req == NULL) {
2498 		return NULL;
2499 	}
2500 
2501 	state->ev = ev;
2502 	state->timeout = timeout;
2503 	state->h = h;
2504 
2505 	ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2506 	if (ret != 0) {
2507 		tevent_req_error(req, ret);
2508 		return tevent_req_post(req, ev);
2509 	}
2510 
2511 	ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2512 	if (ret != 0) {
2513 		tevent_req_error(req, ret);
2514 		return tevent_req_post(req, ev);
2515 	}
2516 
2517 	ctdb_req_control_trans3_commit(&request, h->recbuf);
2518 	subreq = ctdb_client_control_send(state, ev, h->client,
2519 					  ctdb_client_pnn(h->client),
2520 					  timeout, &request);
2521 	if (tevent_req_nomem(subreq, req)) {
2522 		return tevent_req_post(req, ev);
2523 	}
2524 	tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2525 
2526 	return req;
2527 }
2528 
ctdb_transaction_commit_done(struct tevent_req * subreq)2529 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2530 {
2531 	struct tevent_req *req = tevent_req_callback_data(
2532 		subreq, struct tevent_req);
2533 	struct ctdb_transaction_commit_state *state = tevent_req_data(
2534 		req, struct ctdb_transaction_commit_state);
2535 	struct ctdb_transaction_handle *h = state->h;
2536 	struct ctdb_reply_control *reply;
2537 	uint64_t seqnum;
2538 	int ret;
2539 	bool status;
2540 
2541 	status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2542 	TALLOC_FREE(subreq);
2543 	if (! status) {
2544 		DEBUG(DEBUG_ERR,
2545 		      ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2546 		       h->db->db_name, ret));
2547 		tevent_req_error(req, ret);
2548 		return;
2549 	}
2550 
2551 	ret = ctdb_reply_control_trans3_commit(reply);
2552 	talloc_free(reply);
2553 
2554 	if (ret != 0) {
2555 		/* Control failed due to recovery */
2556 
2557 		ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2558 		if (ret != 0) {
2559 			tevent_req_error(req, ret);
2560 			return;
2561 		}
2562 
2563 		if (seqnum == state->seqnum) {
2564 			struct ctdb_req_control request;
2565 
2566 			/* try again */
2567 			ctdb_req_control_trans3_commit(&request,
2568 						       state->h->recbuf);
2569 			subreq = ctdb_client_control_send(
2570 					state, state->ev, state->h->client,
2571 					ctdb_client_pnn(state->h->client),
2572 					state->timeout, &request);
2573 			if (tevent_req_nomem(subreq, req)) {
2574 				return;
2575 			}
2576 			tevent_req_set_callback(subreq,
2577 						ctdb_transaction_commit_done,
2578 						req);
2579 			return;
2580 		}
2581 
2582 		if (seqnum != state->seqnum + 1) {
2583 			DEBUG(DEBUG_ERR,
2584 			      ("transaction_commit: %s seqnum mismatch "
2585 			       "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2586 			       state->h->db->db_name, seqnum, state->seqnum));
2587 			tevent_req_error(req, EIO);
2588 			return;
2589 		}
2590 	}
2591 
2592 	/* trans3_commit successful */
2593 	subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2594 					 h->db_g_lock, h->lock_name, h->sid);
2595 	if (tevent_req_nomem(subreq, req)) {
2596 		return;
2597 	}
2598 	tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2599 				req);
2600 }
2601 
ctdb_transaction_commit_g_lock_done(struct tevent_req * subreq)2602 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2603 {
2604 	struct tevent_req *req = tevent_req_callback_data(
2605 		subreq, struct tevent_req);
2606 	struct ctdb_transaction_commit_state *state = tevent_req_data(
2607 		req, struct ctdb_transaction_commit_state);
2608 	int ret;
2609 	bool status;
2610 
2611 	status = ctdb_g_lock_unlock_recv(subreq, &ret);
2612 	TALLOC_FREE(subreq);
2613 	if (! status) {
2614 		DEBUG(DEBUG_ERR,
2615 		      ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2616 		       state->h->db->db_name, ret));
2617 		tevent_req_error(req, ret);
2618 		return;
2619 	}
2620 
2621 	talloc_free(state->h);
2622 	tevent_req_done(req);
2623 }
2624 
ctdb_transaction_commit_recv(struct tevent_req * req,int * perr)2625 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2626 {
2627 	int err;
2628 
2629 	if (tevent_req_is_unix_error(req, &err)) {
2630 		if (perr != NULL) {
2631 			*perr = err;
2632 		}
2633 		return false;
2634 	}
2635 
2636 	return true;
2637 }
2638 
ctdb_transaction_commit(struct ctdb_transaction_handle * h)2639 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2640 {
2641 	struct tevent_context *ev = h->ev;
2642 	TALLOC_CTX *mem_ctx;
2643 	struct tevent_req *req;
2644 	int ret;
2645 	bool status;
2646 
2647 	if (h->readonly || ! h->updated) {
2648 		return ctdb_transaction_cancel(h);
2649 	}
2650 
2651 	mem_ctx = talloc_new(NULL);
2652 	if (mem_ctx == NULL) {
2653 		return ENOMEM;
2654 	}
2655 
2656 	req = ctdb_transaction_commit_send(mem_ctx, ev,
2657 					   tevent_timeval_zero(), h);
2658 	if (req == NULL) {
2659 		talloc_free(mem_ctx);
2660 		return ENOMEM;
2661 	}
2662 
2663 	tevent_req_poll(req, ev);
2664 
2665 	status = ctdb_transaction_commit_recv(req, &ret);
2666 	if (! status) {
2667 		talloc_free(mem_ctx);
2668 		return ret;
2669 	}
2670 
2671 	talloc_free(mem_ctx);
2672 	return 0;
2673 }
2674 
2675 struct ctdb_transaction_cancel_state {
2676 	struct tevent_context *ev;
2677 	struct ctdb_transaction_handle *h;
2678 	struct timeval timeout;
2679 };
2680 
2681 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2682 
ctdb_transaction_cancel_send(TALLOC_CTX * mem_ctx,struct tevent_context * ev,struct timeval timeout,struct ctdb_transaction_handle * h)2683 struct tevent_req *ctdb_transaction_cancel_send(
2684 					TALLOC_CTX *mem_ctx,
2685 					struct tevent_context *ev,
2686 					struct timeval timeout,
2687 					struct ctdb_transaction_handle *h)
2688 {
2689 	struct tevent_req *req, *subreq;
2690 	struct ctdb_transaction_cancel_state *state;
2691 
2692 	req = tevent_req_create(mem_ctx, &state,
2693 				struct ctdb_transaction_cancel_state);
2694 	if (req == NULL) {
2695 		return NULL;
2696 	}
2697 
2698 	state->ev = ev;
2699 	state->h = h;
2700 	state->timeout = timeout;
2701 
2702 	subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2703 					 state->h->db_g_lock,
2704 					 state->h->lock_name, state->h->sid);
2705 	if (tevent_req_nomem(subreq, req)) {
2706 		return tevent_req_post(req, ev);
2707 	}
2708 	tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2709 				req);
2710 
2711 	return req;
2712 }
2713 
ctdb_transaction_cancel_done(struct tevent_req * subreq)2714 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2715 {
2716 	struct tevent_req *req = tevent_req_callback_data(
2717 		subreq, struct tevent_req);
2718 	struct ctdb_transaction_cancel_state *state = tevent_req_data(
2719 		req, struct ctdb_transaction_cancel_state);
2720 	int ret;
2721 	bool status;
2722 
2723 	status = ctdb_g_lock_unlock_recv(subreq, &ret);
2724 	TALLOC_FREE(subreq);
2725 	if (! status) {
2726 		DEBUG(DEBUG_ERR,
2727 		      ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2728 		       state->h->db->db_name, ret));
2729 		talloc_free(state->h);
2730 		tevent_req_error(req, ret);
2731 		return;
2732 	}
2733 
2734 	talloc_free(state->h);
2735 	tevent_req_done(req);
2736 }
2737 
ctdb_transaction_cancel_recv(struct tevent_req * req,int * perr)2738 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2739 {
2740 	int err;
2741 
2742 	if (tevent_req_is_unix_error(req, &err)) {
2743 		if (perr != NULL) {
2744 			*perr = err;
2745 		}
2746 		return false;
2747 	}
2748 
2749 	return true;
2750 }
2751 
ctdb_transaction_cancel(struct ctdb_transaction_handle * h)2752 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2753 {
2754 	struct tevent_context *ev = h->ev;
2755 	struct tevent_req *req;
2756 	TALLOC_CTX *mem_ctx;
2757 	int ret;
2758 	bool status;
2759 
2760 	mem_ctx = talloc_new(NULL);
2761 	if (mem_ctx == NULL) {
2762 		talloc_free(h);
2763 		return ENOMEM;
2764 	}
2765 
2766 	req = ctdb_transaction_cancel_send(mem_ctx, ev,
2767 					   tevent_timeval_zero(), h);
2768 	if (req == NULL) {
2769 		talloc_free(mem_ctx);
2770 		talloc_free(h);
2771 		return ENOMEM;
2772 	}
2773 
2774 	tevent_req_poll(req, ev);
2775 
2776 	status = ctdb_transaction_cancel_recv(req, &ret);
2777 	if (! status) {
2778 		talloc_free(mem_ctx);
2779 		return ret;
2780 	}
2781 
2782 	talloc_free(mem_ctx);
2783 	return 0;
2784 }
2785 
2786 /*
2787  * TODO:
2788  *
2789  * In future Samba should register SERVER_ID.
2790  * Make that structure same as struct srvid {}.
2791  */
2792