1 /*	$NetBSD: cluster.c,v 1.1.1.1 2009/12/02 00:27:08 haad Exp $	*/
2 
3 /*
4  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
5  *
6  * This copyrighted material is made available to anyone wishing to use,
7  * modify, copy, or redistribute it subject to the terms and conditions
8  * of the GNU Lesser General Public License v.2.1.
9  *
10  * You should have received a copy of the GNU Lesser General Public License
11  * along with this program; if not, write to the Free Software Foundation,
12  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
13  */
14 #include <errno.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <unistd.h>
18 #include <stdint.h>
19 #include <stdlib.h>
20 #include <signal.h>
21 #include <sys/socket.h> /* These are for OpenAIS CPGs */
22 #include <sys/select.h>
23 #include <sys/un.h>
24 #include <netinet/in.h>
25 #include <arpa/inet.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/cpg.h>
28 #include <openais/saAis.h>
29 #include <openais/saCkpt.h>
30 
31 #include "dm-log-userspace.h"
32 #include "libdevmapper.h"
33 #include "functions.h"
34 #include "local.h"
35 #include "common.h"
36 #include "logging.h"
37 #include "link_mon.h"
38 #include "cluster.h"
39 
40 /* Open AIS error codes */
41 #define str_ais_error(x)						\
42 	((x) == SA_AIS_OK) ? "SA_AIS_OK" :				\
43 	((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" :		\
44 	((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" :		\
45 	((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" :			\
46 	((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" :		\
47 	((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" :	\
48 	((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
49 	((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" :	\
50 	((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" :	\
51 	((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" :			\
52 	((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" :		\
53 	((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" :	\
54 	((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
55 	((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" :		\
56 	((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" :		\
57 	((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" :	\
58 	((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
59 	((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" :	\
60 	((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
61 	((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
62 	((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
63 	((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
64 	((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" :	\
65 	((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
66 	((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" :	\
67 	((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" :		\
68 	((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" :	\
69 	"ais_error_unknown"
70 
71 #define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
72 #define DM_ULOG_CHECKPOINT_READY 21
73 #define DM_ULOG_MEMBER_JOIN      22
74 
75 #define _RQ_TYPE(x)							\
76 	((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
77 	((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN":		\
78 	RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
79 
80 static uint32_t my_cluster_id = 0xDEAD;
81 static SaCkptHandleT ckpt_handle = 0;
82 static SaCkptCallbacksT callbacks = { 0, 0 };
83 static SaVersionT version = { 'B', 1, 1 };
84 
85 #define DEBUGGING_HISTORY 100
86 //static char debugging[DEBUGGING_HISTORY][128];
87 //static int idx = 0;
88 #define LOG_SPRINT(cc, f, arg...) do {				\
89 		cc->idx++;					\
90 		cc->idx = cc->idx % DEBUGGING_HISTORY;		\
91 		sprintf(cc->debugging[cc->idx], f, ## arg);	\
92 	} while (0)
93 
94 static int log_resp_rec = 0;
95 
96 struct checkpoint_data {
97 	uint32_t requester;
98 	char uuid[CPG_MAX_NAME_LENGTH];
99 
100 	int bitmap_size; /* in bytes */
101 	char *sync_bits;
102 	char *clean_bits;
103 	char *recovering_region;
104 	struct checkpoint_data *next;
105 };
106 
107 #define INVALID 0
108 #define VALID   1
109 #define LEAVING 2
110 
111 #define MAX_CHECKPOINT_REQUESTERS 10
112 struct clog_cpg {
113 	struct dm_list list;
114 
115 	uint32_t lowest_id;
116 	cpg_handle_t handle;
117 	struct cpg_name name;
118 	uint64_t luid;
119 
120 	/* Are we the first, or have we received checkpoint? */
121 	int state;
122 	int cpg_state;  /* FIXME: debugging */
123 	int free_me;
124 	int delay;
125 	int resend_requests;
126 	struct dm_list startup_list;
127 	struct dm_list working_list;
128 
129 	int checkpoints_needed;
130 	uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS];
131 	struct checkpoint_data *checkpoint_list;
132 	int idx;
133 	char debugging[DEBUGGING_HISTORY][128];
134 };
135 
136 static struct dm_list clog_cpg_list;
137 
138 /*
139  * cluster_send
140  * @rq
141  *
142  * Returns: 0 on success, -Exxx on error
143  */
144 int cluster_send(struct clog_request *rq)
145 {
146 	int r;
147 	int count=0;
148 	int found;
149 	struct iovec iov;
150 	struct clog_cpg *entry;
151 
152 	dm_list_iterate_items(entry, &clog_cpg_list)
153 		if (!strncmp(entry->name.value, rq->u_rq.uuid,
154 			     CPG_MAX_NAME_LENGTH)) {
155 			found = 1;
156 			break;
157 		}
158 
159 	if (!found) {
160 		rq->u_rq.error = -ENOENT;
161 		return -ENOENT;
162 	}
163 
164 	/*
165 	 * Once the request heads for the cluster, the luid looses
166 	 * all its meaning.
167 	 */
168 	rq->u_rq.luid = 0;
169 
170 	iov.iov_base = rq;
171 	iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size;
172 
173 	if (entry->cpg_state != VALID)
174 		return -EINVAL;
175 
176 	do {
177 		r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
178 		if (r != SA_AIS_ERR_TRY_AGAIN)
179 			break;
180 		count++;
181 		if (count < 10)
182 			LOG_PRINT("[%s]  Retry #%d of cpg_mcast_joined: %s",
183 				  SHORT_UUID(rq->u_rq.uuid), count,
184 				  str_ais_error(r));
185 		else if ((count < 100) && !(count % 10))
186 			LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s",
187 				  SHORT_UUID(rq->u_rq.uuid), count,
188 				  str_ais_error(r));
189 		else if ((count < 1000) && !(count % 100))
190 			LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s",
191 				  SHORT_UUID(rq->u_rq.uuid), count,
192 				  str_ais_error(r));
193 		else if ((count < 10000) && !(count % 1000))
194 			LOG_ERROR("[%s]  Retry #%d of cpg_mcast_joined: %s - "
195 				  "OpenAIS not handling the load?",
196 				  SHORT_UUID(rq->u_rq.uuid), count,
197 				  str_ais_error(r));
198 		usleep(1000);
199 	} while (1);
200 
201 	if (r == CPG_OK)
202 		return 0;
203 
204 	/* error codes found in openais/cpg.h */
205 	LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
206 
207 	rq->u_rq.error = -EBADE;
208 	return -EBADE;
209 }
210 
211 static struct clog_request *get_matching_rq(struct clog_request *rq,
212 					    struct dm_list *l)
213 {
214 	struct clog_request *match, *n;
215 
216 	dm_list_iterate_items_safe(match, n, l)
217 		if (match->u_rq.seq == rq->u_rq.seq) {
218 			dm_list_del(&match->list);
219 			return match;
220 		}
221 
222 	return NULL;
223 }
224 
225 static char rq_buffer[DM_ULOG_REQUEST_SIZE];
226 static int handle_cluster_request(struct clog_cpg *entry,
227 				  struct clog_request *rq, int server)
228 {
229 	int r = 0;
230 	struct clog_request *tmp = (struct clog_request *)rq_buffer;
231 
232 	/*
233 	 * We need a separate dm_ulog_request struct, one that can carry
234 	 * a return payload.  Otherwise, the memory address after
235 	 * rq will be altered - leading to problems
236 	 */
237 	memset(rq_buffer, 0, sizeof(rq_buffer));
238 	memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size);
239 
240 	/*
241 	 * With resumes, we only handle our own.
242 	 * Resume is a special case that requires
243 	 * local action (to set up CPG), followed by
244 	 * a cluster action to co-ordinate reading
245 	 * the disk and checkpointing
246 	 */
247 	if (tmp->u_rq.request_type == DM_ULOG_RESUME) {
248 		if (tmp->originator == my_cluster_id) {
249 			r = do_request(tmp, server);
250 
251 			r = kernel_send(&tmp->u_rq);
252 			if (r < 0)
253 				LOG_ERROR("Failed to send resume response to kernel");
254 		}
255 		return r;
256 	}
257 
258 	r = do_request(tmp, server);
259 
260 	if (server &&
261 	    (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
262 	    (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) {
263 		tmp->u_rq.request_type |= DM_ULOG_RESPONSE;
264 
265 		/*
266 		 * Errors from previous functions are in the rq struct.
267 		 */
268 		r = cluster_send(tmp);
269 		if (r < 0)
270 			LOG_ERROR("cluster_send failed: %s", strerror(-r));
271 	}
272 
273 	return r;
274 }
275 
276 static int handle_cluster_response(struct clog_cpg *entry,
277 				   struct clog_request *rq)
278 {
279 	int r = 0;
280 	struct clog_request *orig_rq;
281 
282 	/*
283 	 * If I didn't send it, then I don't care about the response
284 	 */
285 	if (rq->originator != my_cluster_id)
286 		return 0;
287 
288 	rq->u_rq.request_type &= ~DM_ULOG_RESPONSE;
289 	orig_rq = get_matching_rq(rq, &entry->working_list);
290 
291 	if (!orig_rq) {
292 		/* Unable to find match for response */
293 
294 		LOG_ERROR("[%s] No match for cluster response: %s:%u",
295 			  SHORT_UUID(rq->u_rq.uuid),
296 			  _RQ_TYPE(rq->u_rq.request_type),
297 			  rq->u_rq.seq);
298 
299 		LOG_ERROR("Current local list:");
300 		if (dm_list_empty(&entry->working_list))
301 			LOG_ERROR("   [none]");
302 
303 		dm_list_iterate_items(orig_rq, &entry->working_list)
304 			LOG_ERROR("   [%s]  %s:%u",
305 				  SHORT_UUID(orig_rq->u_rq.uuid),
306 				  _RQ_TYPE(orig_rq->u_rq.request_type),
307 				  orig_rq->u_rq.seq);
308 
309 		return -EINVAL;
310 	}
311 
312 	if (log_resp_rec > 0) {
313 		LOG_COND(log_resend_requests,
314 			 "[%s] Response received to %s/#%u",
315 			 SHORT_UUID(rq->u_rq.uuid),
316 			 _RQ_TYPE(rq->u_rq.request_type),
317 			 rq->u_rq.seq);
318 		log_resp_rec--;
319 	}
320 
321 	/* FIXME: Ensure memcpy cannot explode */
322 	memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
323 
324 	r = kernel_send(&orig_rq->u_rq);
325 	if (r)
326 		LOG_ERROR("Failed to send response to kernel");
327 
328 	free(orig_rq);
329 	return r;
330 }
331 
332 static struct clog_cpg *find_clog_cpg(cpg_handle_t handle)
333 {
334 	struct clog_cpg *match;
335 
336 	dm_list_iterate_items(match, &clog_cpg_list)
337 		if (match->handle == handle)
338 			return match;
339 
340 	return NULL;
341 }
342 
343 /*
344  * prepare_checkpoint
345  * @entry: clog_cpg describing the log
346  * @cp_requester: nodeid requesting the checkpoint
347  *
348  * Creates and fills in a new checkpoint_data struct.
349  *
350  * Returns: checkpoint_data on success, NULL on error
351  */
352 static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
353 						  uint32_t cp_requester)
354 {
355 	int r;
356 	struct checkpoint_data *new;
357 
358 	if (entry->state != VALID) {
359 		/*
360 		 * We can't store bitmaps yet, because the log is not
361 		 * valid yet.
362 		 */
363 		LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
364 			  cp_requester);
365 		return NULL;
366 	}
367 
368 	new = malloc(sizeof(*new));
369 	if (!new) {
370 		LOG_ERROR("Unable to create checkpoint data for %u",
371 			  cp_requester);
372 		return NULL;
373 	}
374 	memset(new, 0, sizeof(*new));
375 	new->requester = cp_requester;
376 	strncpy(new->uuid, entry->name.value, entry->name.length);
377 
378 	new->bitmap_size = push_state(entry->name.value, entry->luid,
379 				      "clean_bits",
380 				      &new->clean_bits, cp_requester);
381 	if (new->bitmap_size <= 0) {
382 		LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
383 			  new->requester);
384 		free(new);
385 		return NULL;
386 	}
387 
388 	new->bitmap_size = push_state(entry->name.value, entry->luid,
389 				      "sync_bits",
390 				      &new->sync_bits, cp_requester);
391 	if (new->bitmap_size <= 0) {
392 		LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
393 			  new->requester);
394 		free(new->clean_bits);
395 		free(new);
396 		return NULL;
397 	}
398 
399 	r = push_state(entry->name.value, entry->luid,
400 		       "recovering_region",
401 		       &new->recovering_region, cp_requester);
402 	if (r <= 0) {
403 		LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
404 			  new->requester);
405 		free(new->sync_bits);
406 		free(new->clean_bits);
407 		free(new);
408 		return NULL;
409 	}
410 	LOG_DBG("[%s] Checkpoint prepared for node %u:",
411 		SHORT_UUID(new->uuid), new->requester);
412 	LOG_DBG("  bitmap_size = %d", new->bitmap_size);
413 
414 	return new;
415 }
416 
417 /*
418  * free_checkpoint
419  * @cp: the checkpoint_data struct to free
420  *
421  */
422 static void free_checkpoint(struct checkpoint_data *cp)
423 {
424 	free(cp->recovering_region);
425 	free(cp->sync_bits);
426 	free(cp->clean_bits);
427 	free(cp);
428 }
429 
430 static int export_checkpoint(struct checkpoint_data *cp)
431 {
432 	SaCkptCheckpointCreationAttributesT attr;
433 	SaCkptCheckpointHandleT h;
434 	SaCkptSectionIdT section_id;
435 	SaCkptSectionCreationAttributesT section_attr;
436 	SaCkptCheckpointOpenFlagsT flags;
437 	SaNameT name;
438 	SaAisErrorT rv;
439 	struct clog_request *rq;
440 	int len, r = 0;
441 	char buf[32];
442 
443 	LOG_DBG("Sending checkpointed data to %u", cp->requester);
444 
445 	len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH,
446 		       "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester);
447 	name.length = len;
448 
449 	len = strlen(cp->recovering_region) + 1;
450 
451 	attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
452 	attr.checkpointSize = cp->bitmap_size * 2 + len;
453 
454 	attr.retentionDuration = SA_TIME_MAX;
455 	attr.maxSections = 4;      /* don't know why we need +1 */
456 
457 	attr.maxSectionSize = (cp->bitmap_size > len) ?	cp->bitmap_size : len;
458 	attr.maxSectionIdSize = 22;
459 
460 	flags = SA_CKPT_CHECKPOINT_READ |
461 		SA_CKPT_CHECKPOINT_WRITE |
462 		SA_CKPT_CHECKPOINT_CREATE;
463 
464 open_retry:
465 	rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h);
466 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
467 		LOG_ERROR("export_checkpoint: ckpt open retry");
468 		usleep(1000);
469 		goto open_retry;
470 	}
471 
472 	if (rv == SA_AIS_ERR_EXIST) {
473 		LOG_DBG("export_checkpoint: checkpoint already exists");
474 		return -EEXIST;
475 	}
476 
477 	if (rv != SA_AIS_OK) {
478 		LOG_ERROR("[%s] Failed to open checkpoint for %u: %s",
479 			  SHORT_UUID(cp->uuid), cp->requester,
480 			  str_ais_error(rv));
481 		return -EIO; /* FIXME: better error */
482 	}
483 
484 	/*
485 	 * Add section for sync_bits
486 	 */
487 	section_id.idLen = snprintf(buf, 32, "sync_bits");
488 	section_id.id = (unsigned char *)buf;
489 	section_attr.sectionId = &section_id;
490 	section_attr.expirationTime = SA_TIME_END;
491 
492 sync_create_retry:
493 	rv = saCkptSectionCreate(h, &section_attr,
494 				 cp->sync_bits, cp->bitmap_size);
495 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
496 		LOG_ERROR("Sync checkpoint section create retry");
497 		usleep(1000);
498 		goto sync_create_retry;
499 	}
500 
501 	if (rv == SA_AIS_ERR_EXIST) {
502 		LOG_DBG("Sync checkpoint section already exists");
503 		saCkptCheckpointClose(h);
504 		return -EEXIST;
505 	}
506 
507 	if (rv != SA_AIS_OK) {
508 		LOG_ERROR("Sync checkpoint section creation failed: %s",
509 			  str_ais_error(rv));
510 		saCkptCheckpointClose(h);
511 		return -EIO; /* FIXME: better error */
512 	}
513 
514 	/*
515 	 * Add section for clean_bits
516 	 */
517 	section_id.idLen = snprintf(buf, 32, "clean_bits");
518 	section_id.id = (unsigned char *)buf;
519 	section_attr.sectionId = &section_id;
520 	section_attr.expirationTime = SA_TIME_END;
521 
522 clean_create_retry:
523 	rv = saCkptSectionCreate(h, &section_attr, cp->clean_bits, cp->bitmap_size);
524 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
525 		LOG_ERROR("Clean checkpoint section create retry");
526 		usleep(1000);
527 		goto clean_create_retry;
528 	}
529 
530 	if (rv == SA_AIS_ERR_EXIST) {
531 		LOG_DBG("Clean checkpoint section already exists");
532 		saCkptCheckpointClose(h);
533 		return -EEXIST;
534 	}
535 
536 	if (rv != SA_AIS_OK) {
537 		LOG_ERROR("Clean checkpoint section creation failed: %s",
538 			  str_ais_error(rv));
539 		saCkptCheckpointClose(h);
540 		return -EIO; /* FIXME: better error */
541 	}
542 
543 	/*
544 	 * Add section for recovering_region
545 	 */
546 	section_id.idLen = snprintf(buf, 32, "recovering_region");
547 	section_id.id = (unsigned char *)buf;
548 	section_attr.sectionId = &section_id;
549 	section_attr.expirationTime = SA_TIME_END;
550 
551 rr_create_retry:
552 	rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
553 				 strlen(cp->recovering_region) + 1);
554 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
555 		LOG_ERROR("RR checkpoint section create retry");
556 		usleep(1000);
557 		goto rr_create_retry;
558 	}
559 
560 	if (rv == SA_AIS_ERR_EXIST) {
561 		LOG_DBG("RR checkpoint section already exists");
562 		saCkptCheckpointClose(h);
563 		return -EEXIST;
564 	}
565 
566 	if (rv != SA_AIS_OK) {
567 		LOG_ERROR("RR checkpoint section creation failed: %s",
568 			  str_ais_error(rv));
569 		saCkptCheckpointClose(h);
570 		return -EIO; /* FIXME: better error */
571 	}
572 
573 	LOG_DBG("export_checkpoint: closing checkpoint");
574 	saCkptCheckpointClose(h);
575 
576 	rq = malloc(DM_ULOG_REQUEST_SIZE);
577 	if (!rq) {
578 		LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
579 		return -ENOMEM;
580 	}
581 	memset(rq, 0, sizeof(*rq));
582 
583 	dm_list_init(&rq->list);
584 	rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
585 	rq->originator = cp->requester;  /* FIXME: hack to overload meaning of originator */
586 	strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
587 	rq->u_rq.seq = my_cluster_id;
588 
589 	r = cluster_send(rq);
590 	if (r)
591 		LOG_ERROR("Failed to send checkpoint ready notice: %s",
592 			  strerror(-r));
593 
594 	free(rq);
595 	return 0;
596 }
597 
598 static int import_checkpoint(struct clog_cpg *entry, int no_read)
599 {
600 	int rtn = 0;
601 	SaCkptCheckpointHandleT h;
602 	SaCkptSectionIterationHandleT itr;
603 	SaCkptSectionDescriptorT desc;
604 	SaCkptIOVectorElementT iov;
605 	SaNameT name;
606 	SaAisErrorT rv;
607 	char *bitmap = NULL;
608 	int len;
609 
610 	bitmap = malloc(1024*1024);
611 	if (!bitmap)
612 		return -ENOMEM;
613 
614 	len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
615 		       SHORT_UUID(entry->name.value), my_cluster_id);
616 	name.length = len;
617 
618 open_retry:
619 	rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
620 				  SA_CKPT_CHECKPOINT_READ, 0, &h);
621 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
622 		LOG_ERROR("import_checkpoint: ckpt open retry");
623 		usleep(1000);
624 		goto open_retry;
625 	}
626 
627 	if (rv != SA_AIS_OK) {
628 		LOG_ERROR("[%s] Failed to open checkpoint: %s",
629 			  SHORT_UUID(entry->name.value), str_ais_error(rv));
630 		return -EIO; /* FIXME: better error */
631 	}
632 
633 unlink_retry:
634 	rv = saCkptCheckpointUnlink(ckpt_handle, &name);
635 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
636 		LOG_ERROR("import_checkpoint: ckpt unlink retry");
637 		usleep(1000);
638 		goto unlink_retry;
639 	}
640 
641 	if (no_read) {
642 		LOG_DBG("Checkpoint for this log already received");
643 		goto no_read;
644 	}
645 
646 init_retry:
647 	rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
648 					      SA_TIME_END, &itr);
649 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
650 		LOG_ERROR("import_checkpoint: sync create retry");
651 		usleep(1000);
652 		goto init_retry;
653 	}
654 
655 	if (rv != SA_AIS_OK) {
656 		LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
657 			  SHORT_UUID(entry->name.value), str_ais_error(rv));
658 		return -EIO; /* FIXME: better error */
659 	}
660 
661 	len = 0;
662 	while (1) {
663 		rv = saCkptSectionIterationNext(itr, &desc);
664 		if (rv == SA_AIS_OK)
665 			len++;
666 		else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len)
667 			break;
668 		else if (rv != SA_AIS_ERR_TRY_AGAIN) {
669 			LOG_ERROR("saCkptSectionIterationNext failure: %d", rv);
670 			break;
671 		}
672 	}
673 	saCkptSectionIterationFinalize(itr);
674 	if (len != 3) {
675 		LOG_ERROR("import_checkpoint: %d checkpoint sections found",
676 			  len);
677 		usleep(1000);
678 		goto init_retry;
679 	}
680 	saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
681 					 SA_TIME_END, &itr);
682 
683 	while (1) {
684 		rv = saCkptSectionIterationNext(itr, &desc);
685 		if (rv == SA_AIS_ERR_NO_SECTIONS)
686 			break;
687 
688 		if (rv == SA_AIS_ERR_TRY_AGAIN) {
689 			LOG_ERROR("import_checkpoint: ckpt iternext retry");
690 			usleep(1000);
691 			continue;
692 		}
693 
694 		if (rv != SA_AIS_OK) {
695 			LOG_ERROR("import_checkpoint: clean checkpoint section "
696 				  "creation failed: %s", str_ais_error(rv));
697 			rtn = -EIO; /* FIXME: better error */
698 			goto fail;
699 		}
700 
701 		if (!desc.sectionSize) {
702 			LOG_ERROR("Checkpoint section empty");
703 			continue;
704 		}
705 
706 		memset(bitmap, 0, sizeof(*bitmap));
707 		iov.sectionId = desc.sectionId;
708 		iov.dataBuffer = bitmap;
709 		iov.dataSize = desc.sectionSize;
710 		iov.dataOffset = 0;
711 
712 	read_retry:
713 		rv = saCkptCheckpointRead(h, &iov, 1, NULL);
714 		if (rv == SA_AIS_ERR_TRY_AGAIN) {
715 			LOG_ERROR("ckpt read retry");
716 			usleep(1000);
717 			goto read_retry;
718 		}
719 
720 		if (rv != SA_AIS_OK) {
721 			LOG_ERROR("import_checkpoint: ckpt read error: %s",
722 				  str_ais_error(rv));
723 			rtn = -EIO; /* FIXME: better error */
724 			goto fail;
725 		}
726 
727 		if (iov.readSize) {
728 			if (pull_state(entry->name.value, entry->luid,
729 				       (char *)desc.sectionId.id, bitmap,
730 				       iov.readSize)) {
731 				LOG_ERROR("Error loading state");
732 				rtn = -EIO;
733 				goto fail;
734 			}
735 		} else {
736 			/* Need to request new checkpoint */
737 			rtn = -EAGAIN;
738 			goto fail;
739 		}
740 	}
741 
742 fail:
743 	saCkptSectionIterationFinalize(itr);
744 no_read:
745 	saCkptCheckpointClose(h);
746 
747 	free(bitmap);
748 	return rtn;
749 }
750 
751 static void do_checkpoints(struct clog_cpg *entry, int leaving)
752 {
753 	struct checkpoint_data *cp;
754 
755 	for (cp = entry->checkpoint_list; cp;) {
756 		/*
757 		 * FIXME: Check return code.  Could send failure
758 		 * notice in rq in export_checkpoint function
759 		 * by setting rq->error
760 		 */
761 		switch (export_checkpoint(cp)) {
762 		case -EEXIST:
763 			LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s",
764 				   SHORT_UUID(entry->name.value), cp->requester,
765 				   (leaving) ? "(L)": "");
766 			LOG_COND(log_checkpoint,
767 				 "[%s] Checkpoint for %u already handled%s",
768 				 SHORT_UUID(entry->name.value), cp->requester,
769 				 (leaving) ? "(L)": "");
770 			entry->checkpoint_list = cp->next;
771 			free_checkpoint(cp);
772 			cp = entry->checkpoint_list;
773 			break;
774 		case 0:
775 			LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s",
776 				   SHORT_UUID(entry->name.value), cp->requester,
777 				   (leaving) ? "(L)": "");
778 			LOG_COND(log_checkpoint,
779 				 "[%s] Checkpoint data available for node %u%s",
780 				 SHORT_UUID(entry->name.value), cp->requester,
781 				 (leaving) ? "(L)": "");
782 			entry->checkpoint_list = cp->next;
783 			free_checkpoint(cp);
784 			cp = entry->checkpoint_list;
785 			break;
786 		default:
787 			/* FIXME: Skipping will cause list corruption */
788 			LOG_ERROR("[%s] Failed to export checkpoint for %u%s",
789 				  SHORT_UUID(entry->name.value), cp->requester,
790 				  (leaving) ? "(L)": "");
791 		}
792 	}
793 }
794 
795 static int resend_requests(struct clog_cpg *entry)
796 {
797 	int r = 0;
798 	struct clog_request *rq, *n;
799 
800 	if (!entry->resend_requests || entry->delay)
801 		return 0;
802 
803 	if (entry->state != VALID)
804 		return 0;
805 
806 	entry->resend_requests = 0;
807 
808 	dm_list_iterate_items_safe(rq, n, &entry->working_list) {
809 		dm_list_del(&rq->list);
810 
811 		if (strcmp(entry->name.value, rq->u_rq.uuid)) {
812 			LOG_ERROR("[%s]  Stray request from another log (%s)",
813 				  SHORT_UUID(entry->name.value),
814 				  SHORT_UUID(rq->u_rq.uuid));
815 			free(rq);
816 			continue;
817 		}
818 
819 		switch (rq->u_rq.request_type) {
820 		case DM_ULOG_SET_REGION_SYNC:
821 			/*
822 			 * Some requests simply do not need to be resent.
823 			 * If it is a request that just changes log state,
824 			 * then it doesn't need to be resent (everyone makes
825 			 * updates).
826 			 */
827 			LOG_COND(log_resend_requests,
828 				 "[%s] Skipping resend of %s/#%u...",
829 				 SHORT_UUID(entry->name.value),
830 				 _RQ_TYPE(rq->u_rq.request_type),
831 				 rq->u_rq.seq);
832 			LOG_SPRINT(entry, "###  No resend: [%s] %s/%u ###",
833 				   SHORT_UUID(entry->name.value),
834 				   _RQ_TYPE(rq->u_rq.request_type),
835 				   rq->u_rq.seq);
836 
837 			rq->u_rq.data_size = 0;
838 			kernel_send(&rq->u_rq);
839 
840 			break;
841 
842 		default:
843 			/*
844 			 * If an action or a response is required, then
845 			 * the request must be resent.
846 			 */
847 			LOG_COND(log_resend_requests,
848 				 "[%s] Resending %s(#%u) due to new server(%u)",
849 				 SHORT_UUID(entry->name.value),
850 				 _RQ_TYPE(rq->u_rq.request_type),
851 				 rq->u_rq.seq, entry->lowest_id);
852 			LOG_SPRINT(entry, "***  Resending: [%s] %s/%u ***",
853 				   SHORT_UUID(entry->name.value),
854 				   _RQ_TYPE(rq->u_rq.request_type),
855 				   rq->u_rq.seq);
856 			r = cluster_send(rq);
857 			if (r < 0)
858 				LOG_ERROR("Failed resend");
859 		}
860 		free(rq);
861 	}
862 
863 	return r;
864 }
865 
866 static int do_cluster_work(void *data)
867 {
868 	int r = SA_AIS_OK;
869 	struct clog_cpg *entry;
870 
871 	dm_list_iterate_items(entry, &clog_cpg_list) {
872 		r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
873 		if (r != SA_AIS_OK)
874 			LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
875 
876 		if (entry->free_me) {
877 			free(entry);
878 			continue;
879 		}
880 		do_checkpoints(entry, 0);
881 
882 		resend_requests(entry);
883 	}
884 
885 	return (r == SA_AIS_OK) ? 0 : -1;  /* FIXME: good error number? */
886 }
887 
888 static int flush_startup_list(struct clog_cpg *entry)
889 {
890 	int r = 0;
891 	int i_was_server;
892 	struct clog_request *rq, *n;
893 	struct checkpoint_data *new;
894 
895 	dm_list_iterate_items_safe(rq, n, &entry->startup_list) {
896 		dm_list_del(&rq->list);
897 
898 		if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) {
899 			new = prepare_checkpoint(entry, rq->originator);
900 			if (!new) {
901 				/*
902 				 * FIXME: Need better error handling.  Other nodes
903 				 * will be trying to send the checkpoint too, and we
904 				 * must continue processing the list; so report error
905 				 * but continue.
906 				 */
907 				LOG_ERROR("Failed to prepare checkpoint for %u!!!",
908 					  rq->originator);
909 				free(rq);
910 				continue;
911 			}
912 			LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u",
913 				   SHORT_UUID(entry->name.value), rq->originator);
914 			LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
915 				 SHORT_UUID(entry->name.value), rq->originator);
916 			new->next = entry->checkpoint_list;
917 			entry->checkpoint_list = new;
918 		} else {
919 			LOG_DBG("[%s] Processing delayed request: %s",
920 				SHORT_UUID(rq->u_rq.uuid),
921 				_RQ_TYPE(rq->u_rq.request_type));
922 			i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0;
923 			r = handle_cluster_request(entry, rq, i_was_server);
924 
925 			if (r)
926 				/*
927 				 * FIXME: If we error out here, we will never get
928 				 * another opportunity to retry these requests
929 				 */
930 				LOG_ERROR("Error while processing delayed CPG message");
931 		}
932 		free(rq);
933 	}
934 
935 	return 0;
936 }
937 
938 static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname,
939 				 uint32_t nodeid, uint32_t pid,
940 				 void *msg, size_t msg_len)
941 {
942 	int i;
943 	int r = 0;
944 	int i_am_server;
945 	int response = 0;
946 	struct clog_request *rq = msg;
947 	struct clog_request *tmp_rq;
948 	struct clog_cpg *match;
949 
950 	match = find_clog_cpg(handle);
951 	if (!match) {
952 		LOG_ERROR("Unable to find clog_cpg for cluster message");
953 		return;
954 	}
955 
956 	if ((nodeid == my_cluster_id) &&
957 	    !(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
958 	    (rq->u_rq.request_type != DM_ULOG_RESUME) &&
959 	    (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
960 	    (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) {
961 		tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
962 		if (!tmp_rq) {
963 			/*
964 			 * FIXME: It may be possible to continue... but we
965 			 * would not be able to resend any messages that might
966 			 * be necessary during membership changes
967 			 */
968 			LOG_ERROR("[%s] Unable to record request: -ENOMEM",
969 				  SHORT_UUID(rq->u_rq.uuid));
970 			return;
971 		}
972 		memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
973 		dm_list_init(&tmp_rq->list);
974 		dm_list_add( &match->working_list, &tmp_rq->list);
975 	}
976 
977 	if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
978 		/*
979 		 * If the server (lowest_id) indicates it is leaving,
980 		 * then we must resend any outstanding requests.  However,
981 		 * we do not want to resend them if the next server in
982 		 * line is in the process of leaving.
983 		 */
984 		if (nodeid == my_cluster_id) {
985 			LOG_COND(log_resend_requests, "[%s] I am leaving.1.....",
986 				 SHORT_UUID(rq->u_rq.uuid));
987 		} else {
988 			if (nodeid < my_cluster_id) {
989 				if (nodeid == match->lowest_id) {
990 					match->resend_requests = 1;
991 					LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
992 						 SHORT_UUID(rq->u_rq.uuid), nodeid,
993 						 (dm_list_empty(&match->working_list)) ? " -- working_list empty": "");
994 
995 					dm_list_iterate_items(tmp_rq, &match->working_list)
996 						LOG_COND(log_resend_requests,
997 							 "[%s]                %s/%u",
998 							 SHORT_UUID(tmp_rq->u_rq.uuid),
999 							 _RQ_TYPE(tmp_rq->u_rq.request_type),
1000 							 tmp_rq->u_rq.seq);
1001 				}
1002 
1003 				match->delay++;
1004 				LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
1005 					 SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay);
1006 			}
1007 			rq->originator = nodeid; /* don't really need this, but nice for debug */
1008 			goto out;
1009 		}
1010 	}
1011 
1012 	/*
1013 	 * We can receive messages after we do a cpg_leave but before we
1014 	 * get our config callback.  However, since we can't respond after
1015 	 * leaving, we simply return.
1016 	 */
1017 	if (match->state == LEAVING)
1018 		return;
1019 
1020 	i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
1021 
1022 	if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) {
1023 		if (my_cluster_id == rq->originator) {
1024 			/* Redundant checkpoints ignored if match->valid */
1025 			LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
1026 				   SHORT_UUID(rq->u_rq.uuid), nodeid);
1027 			if (import_checkpoint(match, (match->state != INVALID))) {
1028 				LOG_SPRINT(match,
1029 					   "[%s] Failed to import checkpoint from %u",
1030 					   SHORT_UUID(rq->u_rq.uuid), nodeid);
1031 				LOG_ERROR("[%s] Failed to import checkpoint from %u",
1032 					  SHORT_UUID(rq->u_rq.uuid), nodeid);
1033 				kill(getpid(), SIGUSR1);
1034 				/* Could we retry? */
1035 				goto out;
1036 			} else if (match->state == INVALID) {
1037 				LOG_SPRINT(match,
1038 					   "[%s] Checkpoint data received from %u.  Log is now valid",
1039 					   SHORT_UUID(match->name.value), nodeid);
1040 				LOG_COND(log_checkpoint,
1041 					 "[%s] Checkpoint data received from %u.  Log is now valid",
1042 					 SHORT_UUID(match->name.value), nodeid);
1043 				match->state = VALID;
1044 
1045 				flush_startup_list(match);
1046 			} else {
1047 				LOG_SPRINT(match,
1048 					   "[%s] Redundant checkpoint from %u ignored.",
1049 					   SHORT_UUID(rq->u_rq.uuid), nodeid);
1050 			}
1051 		}
1052 		goto out;
1053 	}
1054 
1055 	if (rq->u_rq.request_type & DM_ULOG_RESPONSE) {
1056 		response = 1;
1057 		r = handle_cluster_response(match, rq);
1058 	} else {
1059 		rq->originator = nodeid;
1060 
1061 		if (match->state == LEAVING) {
1062 			LOG_ERROR("[%s]  Ignoring %s from %u.  Reason: I'm leaving",
1063 				  SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type),
1064 				  rq->originator);
1065 			goto out;
1066 		}
1067 
1068 		if (match->state == INVALID) {
1069 			LOG_DBG("Log not valid yet, storing request");
1070 			tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
1071 			if (!tmp_rq) {
1072 				LOG_ERROR("cpg_message_callback:  Unable to"
1073 					  " allocate transfer structs");
1074 				r = -ENOMEM; /* FIXME: Better error #? */
1075 				goto out;
1076 			}
1077 
1078 			memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
1079 			tmp_rq->pit_server = match->lowest_id;
1080 			dm_list_init(&tmp_rq->list);
1081 			dm_list_add(&match->startup_list, &tmp_rq->list);
1082 			goto out;
1083 		}
1084 
1085 		r = handle_cluster_request(match, rq, i_am_server);
1086 	}
1087 
1088 	/*
1089 	 * If the log is now valid, we can queue the checkpoints
1090 	 */
1091 	for (i = match->checkpoints_needed; i; ) {
1092 		struct checkpoint_data *new;
1093 
1094 		if (log_get_state(&rq->u_rq) != LOG_RESUMED) {
1095 			LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
1096 				SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid);
1097 			break;
1098 		}
1099 
1100 		i--;
1101 		new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
1102 		if (!new) {
1103 			/* FIXME: Need better error handling */
1104 			LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
1105 				  SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1106 			break;
1107 		}
1108 		LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)",
1109 			   SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i],
1110 			   (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
1111 		LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
1112 			 SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
1113 		match->checkpoints_needed--;
1114 
1115 		new->next = match->checkpoint_list;
1116 		match->checkpoint_list = new;
1117 	}
1118 
1119 out:
1120 	/* nothing happens after this point.  It is just for debugging */
1121 	if (r) {
1122 		LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
1123 			  SHORT_UUID(rq->u_rq.uuid),
1124 			  _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE),
1125 			  strerror(-r));
1126 		LOG_ERROR("[%s]    Response  : %s", SHORT_UUID(rq->u_rq.uuid),
1127 			  (response) ? "YES" : "NO");
1128 		LOG_ERROR("[%s]    Originator: %u",
1129 			  SHORT_UUID(rq->u_rq.uuid), rq->originator);
1130 		if (response)
1131 			LOG_ERROR("[%s]    Responder : %u",
1132 				  SHORT_UUID(rq->u_rq.uuid), nodeid);
1133 
1134 		LOG_ERROR("HISTORY::");
1135 		for (i = 0; i < DEBUGGING_HISTORY; i++) {
1136 			match->idx++;
1137 			match->idx = match->idx % DEBUGGING_HISTORY;
1138 			if (match->debugging[match->idx][0] == '\0')
1139 				continue;
1140 			LOG_ERROR("%d:%d) %s", i, match->idx,
1141 				  match->debugging[match->idx]);
1142 		}
1143 	} else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) ||
1144 		   (rq->originator == my_cluster_id)) {
1145 		if (!response)
1146 			LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
1147 				   rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1148 				   _RQ_TYPE(rq->u_rq.request_type),
1149 				   rq->originator, (response) ? "YES" : "NO");
1150 		else
1151 			LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
1152 				   rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
1153 				   _RQ_TYPE(rq->u_rq.request_type),
1154 				   rq->originator, (response) ? "YES" : "NO",
1155 				   nodeid);
1156 	}
1157 }
1158 
1159 static void cpg_join_callback(struct clog_cpg *match,
1160 			      const struct cpg_address *joined,
1161 			      const struct cpg_address *member_list,
1162 			      size_t member_list_entries)
1163 {
1164 	int i;
1165 	int my_pid = getpid();
1166 	uint32_t lowest = match->lowest_id;
1167 	struct clog_request *rq;
1168 	char dbuf[32];
1169 
1170 	/* Assign my_cluster_id */
1171 	if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
1172 		my_cluster_id = joined->nodeid;
1173 
1174 	/* Am I the very first to join? */
1175 	if (member_list_entries == 1) {
1176 		match->lowest_id = joined->nodeid;
1177 		match->state = VALID;
1178 	}
1179 
1180 	/* If I am part of the joining list, I do not send checkpoints */
1181 	if (joined->nodeid == my_cluster_id)
1182 		goto out;
1183 
1184 	memset(dbuf, 0, sizeof(dbuf));
1185 	for (i = 0; i < (member_list_entries-1); i++)
1186 		sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid);
1187 	sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid);
1188 	LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]",
1189 		 SHORT_UUID(match->name.value), joined->nodeid, dbuf);
1190 
1191 	/*
1192 	 * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
1193 	 * the startup_list interface exclusively
1194 	 */
1195 	if (dm_list_empty(&match->startup_list) && (match->state == VALID) &&
1196 	    (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) {
1197 		match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
1198 		goto out;
1199 	}
1200 
1201 	rq = malloc(DM_ULOG_REQUEST_SIZE);
1202 	if (!rq) {
1203 		LOG_ERROR("cpg_config_callback: "
1204 			  "Unable to allocate transfer structs");
1205 		LOG_ERROR("cpg_config_callback: "
1206 			  "Unable to perform checkpoint");
1207 		goto out;
1208 	}
1209 	rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN;
1210 	rq->originator = joined->nodeid;
1211 	dm_list_init(&rq->list);
1212 	dm_list_add(&match->startup_list, &rq->list);
1213 
1214 out:
1215 	/* Find the lowest_id, i.e. the server */
1216 	match->lowest_id = member_list[0].nodeid;
1217 	for (i = 0; i < member_list_entries; i++)
1218 		if (match->lowest_id > member_list[i].nodeid)
1219 			match->lowest_id = member_list[i].nodeid;
1220 
1221 	if (lowest == 0xDEAD)
1222 		LOG_COND(log_membership_change, "[%s]  Server change <none> -> %u (%u %s)",
1223 			 SHORT_UUID(match->name.value), match->lowest_id,
1224 			 joined->nodeid, (member_list_entries == 1) ?
1225 			 "is first to join" : "joined");
1226 	else if (lowest != match->lowest_id)
1227 		LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u joined)",
1228 			 SHORT_UUID(match->name.value), lowest,
1229 			 match->lowest_id, joined->nodeid);
1230 	else
1231 		LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u joined)",
1232 			 SHORT_UUID(match->name.value),
1233 			 lowest, joined->nodeid);
1234 	LOG_SPRINT(match, "+++  UUID=%s  %u join  +++",
1235 		   SHORT_UUID(match->name.value), joined->nodeid);
1236 }
1237 
1238 static void cpg_leave_callback(struct clog_cpg *match,
1239 			       const struct cpg_address *left,
1240 			       const struct cpg_address *member_list,
1241 			       size_t member_list_entries)
1242 {
1243 	int i, j, fd;
1244 	uint32_t lowest = match->lowest_id;
1245 	struct clog_request *rq, *n;
1246 	struct checkpoint_data *p_cp, *c_cp;
1247 
1248 	LOG_SPRINT(match, "---  UUID=%s  %u left  ---",
1249 		   SHORT_UUID(match->name.value), left->nodeid);
1250 
1251 	/* Am I leaving? */
1252 	if (my_cluster_id == left->nodeid) {
1253 		LOG_DBG("Finalizing leave...");
1254 		dm_list_del(&match->list);
1255 
1256 		cpg_fd_get(match->handle, &fd);
1257 		links_unregister(fd);
1258 
1259 		cluster_postsuspend(match->name.value, match->luid);
1260 
1261 		dm_list_iterate_items_safe(rq, n, &match->working_list) {
1262 			dm_list_del(&rq->list);
1263 
1264 			if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
1265 				kernel_send(&rq->u_rq);
1266 			free(rq);
1267 		}
1268 
1269 		cpg_finalize(match->handle);
1270 
1271 		match->free_me = 1;
1272 		match->lowest_id = 0xDEAD;
1273 		match->state = INVALID;
1274 	}
1275 
1276 	/* Remove any pending checkpoints for the leaving node. */
1277 	for (p_cp = NULL, c_cp = match->checkpoint_list;
1278 	     c_cp && (c_cp->requester != left->nodeid);
1279 	     p_cp = c_cp, c_cp = c_cp->next);
1280 	if (c_cp) {
1281 		if (p_cp)
1282 			p_cp->next = c_cp->next;
1283 		else
1284 			match->checkpoint_list = c_cp->next;
1285 
1286 		LOG_COND(log_checkpoint,
1287 			 "[%s] Removing pending checkpoint (%u is leaving)",
1288 			 SHORT_UUID(match->name.value), left->nodeid);
1289 		free_checkpoint(c_cp);
1290 	}
1291 	dm_list_iterate_items_safe(rq, n, &match->startup_list) {
1292 		if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) &&
1293 		    (rq->originator == left->nodeid)) {
1294 			LOG_COND(log_checkpoint,
1295 				 "[%s] Removing pending ckpt from startup list (%u is leaving)",
1296 				 SHORT_UUID(match->name.value), left->nodeid);
1297 			dm_list_del(&rq->list);
1298 			free(rq);
1299 		}
1300 	}
1301 	for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) {
1302 		match->checkpoint_requesters[j] = match->checkpoint_requesters[i];
1303 		if (match->checkpoint_requesters[i] == left->nodeid) {
1304 			LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)",
1305 				  SHORT_UUID(match->name.value), left->nodeid);
1306 			j--;
1307 		}
1308 	}
1309 	match->checkpoints_needed = j;
1310 
1311 	if (left->nodeid < my_cluster_id) {
1312 		match->delay = (match->delay > 0) ? match->delay - 1 : 0;
1313 		if (!match->delay && dm_list_empty(&match->working_list))
1314 			match->resend_requests = 0;
1315 		LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
1316 			 SHORT_UUID(match->name.value), left->nodeid,
1317 			 match->delay, (dm_list_empty(&match->working_list)) ?
1318 			 " -- working_list empty": "");
1319 	}
1320 
1321 	/* Find the lowest_id, i.e. the server */
1322 	if (!member_list_entries) {
1323 		match->lowest_id = 0xDEAD;
1324 		LOG_COND(log_membership_change, "[%s]  Server change %u -> <none> "
1325 			 "(%u is last to leave)",
1326 			 SHORT_UUID(match->name.value), left->nodeid,
1327 			 left->nodeid);
1328 		return;
1329 	}
1330 
1331 	match->lowest_id = member_list[0].nodeid;
1332 	for (i = 0; i < member_list_entries; i++)
1333 		if (match->lowest_id > member_list[i].nodeid)
1334 			match->lowest_id = member_list[i].nodeid;
1335 
1336 	if (lowest != match->lowest_id) {
1337 		LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u left)",
1338 			 SHORT_UUID(match->name.value), lowest,
1339 			 match->lowest_id, left->nodeid);
1340 	} else
1341 		LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u left)",
1342 			 SHORT_UUID(match->name.value), lowest, left->nodeid);
1343 
1344 	if ((match->state == INVALID) && !match->free_me) {
1345 		/*
1346 		 * If all CPG members are waiting for checkpoints and they
1347 		 * are all present in my startup_list, then I was the first to
1348 		 * join and I must assume control.
1349 		 *
1350 		 * We do not normally end up here, but if there was a quick
1351 		 * 'resume -> suspend -> resume' across the cluster, we may
1352 		 * have initially thought we were not the first to join because
1353 		 * of the presence of out-going (and unable to respond) members.
1354 		 */
1355 
1356 		i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
1357 		dm_list_iterate_items(rq, &match->startup_list)
1358 			if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN)
1359 				i++;
1360 
1361 		if (i == member_list_entries) {
1362 			/*
1363 			 * Last node who could have given me a checkpoint just left.
1364 			 * Setting log state to VALID and acting as 'first join'.
1365 			 */
1366 			match->state = VALID;
1367 			flush_startup_list(match);
1368 		}
1369 	}
1370 }
1371 
1372 static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gname,
1373 				const struct cpg_address *member_list,
1374 				size_t member_list_entries,
1375 				const struct cpg_address *left_list,
1376 				size_t left_list_entries,
1377 				const struct cpg_address *joined_list,
1378 				size_t joined_list_entries)
1379 {
1380 	struct clog_cpg *match;
1381 	int found = 0;
1382 
1383 	dm_list_iterate_items(match, &clog_cpg_list)
1384 		if (match->handle == handle) {
1385 			found = 1;
1386 			break;
1387 		}
1388 
1389 	if (!found) {
1390 		LOG_ERROR("Unable to find match for CPG config callback");
1391 		return;
1392 	}
1393 
1394 	if ((joined_list_entries + left_list_entries) > 1)
1395 		LOG_ERROR("[%s]  More than one node joining/leaving",
1396 			  SHORT_UUID(match->name.value));
1397 
1398 	if (joined_list_entries)
1399 		cpg_join_callback(match, joined_list,
1400 				  member_list, member_list_entries);
1401 	else
1402 		cpg_leave_callback(match, left_list,
1403 				   member_list, member_list_entries);
1404 }
1405 
1406 cpg_callbacks_t cpg_callbacks = {
1407 	.cpg_deliver_fn = cpg_message_callback,
1408 	.cpg_confchg_fn = cpg_config_callback,
1409 };
1410 
1411 /*
1412  * remove_checkpoint
1413  * @entry
1414  *
1415  * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error
1416  */
1417 int remove_checkpoint(struct clog_cpg *entry)
1418 {
1419 	int len;
1420 	SaNameT name;
1421 	SaAisErrorT rv;
1422 	SaCkptCheckpointHandleT h;
1423 
1424 	len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
1425                        SHORT_UUID(entry->name.value), my_cluster_id);
1426 	name.length = len;
1427 
1428 open_retry:
1429 	rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
1430                                   SA_CKPT_CHECKPOINT_READ, 0, &h);
1431 	if (rv == SA_AIS_ERR_TRY_AGAIN) {
1432 		LOG_ERROR("abort_startup: ckpt open retry");
1433                 usleep(1000);
1434                 goto open_retry;
1435         }
1436 
1437 	if (rv != SA_AIS_OK)
1438                 return 0;
1439 
1440 	LOG_DBG("[%s]  Removing checkpoint", SHORT_UUID(entry->name.value));
1441 unlink_retry:
1442         rv = saCkptCheckpointUnlink(ckpt_handle, &name);
1443         if (rv == SA_AIS_ERR_TRY_AGAIN) {
1444                 LOG_ERROR("abort_startup: ckpt unlink retry");
1445                 usleep(1000);
1446                 goto unlink_retry;
1447         }
1448 
1449 	if (rv != SA_AIS_OK) {
1450                 LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
1451                           SHORT_UUID(entry->name.value), str_ais_error(rv));
1452                 return -EIO;
1453         }
1454 
1455 	saCkptCheckpointClose(h);
1456 
1457 	return 1;
1458 }
1459 
1460 int create_cluster_cpg(char *uuid, uint64_t luid)
1461 {
1462 	int r;
1463 	int size;
1464 	struct clog_cpg *new = NULL;
1465 	struct clog_cpg *tmp;
1466 
1467 	dm_list_iterate_items(tmp, &clog_cpg_list)
1468 		if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) {
1469 			LOG_ERROR("Log entry already exists: %s", uuid);
1470 			return -EEXIST;
1471 		}
1472 
1473 	new = malloc(sizeof(*new));
1474 	if (!new) {
1475 		LOG_ERROR("Unable to allocate memory for clog_cpg");
1476 		return -ENOMEM;
1477 	}
1478 	memset(new, 0, sizeof(*new));
1479 	dm_list_init(&new->list);
1480 	new->lowest_id = 0xDEAD;
1481 	dm_list_init(&new->startup_list);
1482 	dm_list_init(&new->working_list);
1483 
1484 	size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ?
1485 		CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1);
1486 	strncpy(new->name.value, uuid, size);
1487 	new->name.length = size;
1488 	new->luid = luid;
1489 
1490 	/*
1491 	 * Ensure there are no stale checkpoints around before we join
1492 	 */
1493 	if (remove_checkpoint(new) == 1)
1494 		LOG_COND(log_checkpoint,
1495 			 "[%s]  Removing checkpoints left from previous session",
1496 			 SHORT_UUID(new->name.value));
1497 
1498 	r = cpg_initialize(&new->handle, &cpg_callbacks);
1499 	if (r != SA_AIS_OK) {
1500 		LOG_ERROR("cpg_initialize failed:  Cannot join cluster");
1501 		free(new);
1502 		return -EPERM;
1503 	}
1504 
1505 	r = cpg_join(new->handle, &new->name);
1506 	if (r != SA_AIS_OK) {
1507 		LOG_ERROR("cpg_join failed:  Cannot join cluster");
1508 		free(new);
1509 		return -EPERM;
1510 	}
1511 
1512 	new->cpg_state = VALID;
1513 	dm_list_add(&clog_cpg_list, &new->list);
1514 	LOG_DBG("New   handle: %llu", (unsigned long long)new->handle);
1515 	LOG_DBG("New   name: %s", new->name.value);
1516 
1517 	/* FIXME: better variable */
1518 	cpg_fd_get(new->handle, &r);
1519 	links_register(r, "cluster", do_cluster_work, NULL);
1520 
1521 	return 0;
1522 }
1523 
1524 static void abort_startup(struct clog_cpg *del)
1525 {
1526 	struct clog_request *rq, *n;
1527 
1528 	LOG_DBG("[%s]  CPG teardown before checkpoint received",
1529 		SHORT_UUID(del->name.value));
1530 
1531 	dm_list_iterate_items_safe(rq, n, &del->startup_list) {
1532 		dm_list_del(&rq->list);
1533 
1534 		LOG_DBG("[%s]  Ignoring request from %u: %s",
1535 			SHORT_UUID(del->name.value), rq->originator,
1536 			_RQ_TYPE(rq->u_rq.request_type));
1537 		free(rq);
1538 	}
1539 
1540 	remove_checkpoint(del);
1541 }
1542 
1543 static int _destroy_cluster_cpg(struct clog_cpg *del)
1544 {
1545 	int r;
1546 	int state;
1547 
1548 	LOG_COND(log_resend_requests, "[%s] I am leaving.2.....",
1549 		 SHORT_UUID(del->name.value));
1550 
1551 	/*
1552 	 * We must send any left over checkpoints before
1553 	 * leaving.  If we don't, an incoming node could
1554 	 * be stuck with no checkpoint and stall.
1555 	 do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
1556 
1557 	 - Incoming node deletes old checkpoints before joining
1558 	 - A stale checkpoint is issued here by leaving node
1559 	 - (leaving node leaves)
1560 	 - Incoming node joins cluster and finds stale checkpoint.
1561 	 - (leaving node leaves - option 2)
1562 	*/
1563 	do_checkpoints(del, 1);
1564 
1565 	state = del->state;
1566 
1567 	del->cpg_state = INVALID;
1568 	del->state = LEAVING;
1569 
1570 	/*
1571 	 * If the state is VALID, we might be processing the
1572 	 * startup list.  If so, we certainly don't want to
1573 	 * clear the startup_list here by calling abort_startup
1574 	 */
1575 	if (!dm_list_empty(&del->startup_list) && (state != VALID))
1576 		abort_startup(del);
1577 
1578 	r = cpg_leave(del->handle, &del->name);
1579 	if (r != CPG_OK)
1580 		LOG_ERROR("Error leaving CPG!");
1581 	return 0;
1582 }
1583 
1584 int destroy_cluster_cpg(char *uuid)
1585 {
1586 	struct clog_cpg *del, *tmp;
1587 
1588 	dm_list_iterate_items_safe(del, tmp, &clog_cpg_list)
1589 		if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH))
1590 			_destroy_cluster_cpg(del);
1591 
1592 	return 0;
1593 }
1594 
1595 int init_cluster(void)
1596 {
1597 	SaAisErrorT rv;
1598 
1599 	dm_list_init(&clog_cpg_list);
1600 	rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
1601 
1602 	if (rv != SA_AIS_OK)
1603 		return EXIT_CLUSTER_CKPT_INIT;
1604 
1605 	return 0;
1606 }
1607 
1608 void cleanup_cluster(void)
1609 {
1610 	SaAisErrorT err;
1611 
1612 	err = saCkptFinalize(ckpt_handle);
1613 	if (err != SA_AIS_OK)
1614 		LOG_ERROR("Failed to finalize checkpoint handle");
1615 }
1616 
1617 void cluster_debug(void)
1618 {
1619 	struct checkpoint_data *cp;
1620 	struct clog_cpg *entry;
1621 	struct clog_request *rq;
1622 	int i;
1623 
1624 	LOG_ERROR("");
1625 	LOG_ERROR("CLUSTER COMPONENT DEBUGGING::");
1626 	dm_list_iterate_items(entry, &clog_cpg_list) {
1627 		LOG_ERROR("%s::", SHORT_UUID(entry->name.value));
1628 		LOG_ERROR("  lowest_id         : %u", entry->lowest_id);
1629 		LOG_ERROR("  state             : %s", (entry->state == INVALID) ?
1630 			  "INVALID" : (entry->state == VALID) ? "VALID" :
1631 			  (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN");
1632 		LOG_ERROR("  cpg_state         : %d", entry->cpg_state);
1633 		LOG_ERROR("  free_me           : %d", entry->free_me);
1634 		LOG_ERROR("  delay             : %d", entry->delay);
1635 		LOG_ERROR("  resend_requests   : %d", entry->resend_requests);
1636 		LOG_ERROR("  checkpoints_needed: %d", entry->checkpoints_needed);
1637 		for (i = 0, cp = entry->checkpoint_list;
1638 		     i < MAX_CHECKPOINT_REQUESTERS; i++)
1639 			if (cp)
1640 				cp = cp->next;
1641 			else
1642 				break;
1643 		LOG_ERROR("  CKPTs waiting     : %d", i);
1644 		LOG_ERROR("  Working list:");
1645 		dm_list_iterate_items(rq, &entry->working_list)
1646 			LOG_ERROR("  %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1647 				  rq->u_rq.seq);
1648 
1649 		LOG_ERROR("  Startup list:");
1650 		dm_list_iterate_items(rq, &entry->startup_list)
1651 			LOG_ERROR("  %s/%u", _RQ_TYPE(rq->u_rq.request_type),
1652 				  rq->u_rq.seq);
1653 
1654 		LOG_ERROR("Command History:");
1655 		for (i = 0; i < DEBUGGING_HISTORY; i++) {
1656 			entry->idx++;
1657 			entry->idx = entry->idx % DEBUGGING_HISTORY;
1658 			if (entry->debugging[entry->idx][0] == '\0')
1659 				continue;
1660 			LOG_ERROR("%d:%d) %s", i, entry->idx,
1661 				  entry->debugging[entry->idx]);
1662 		}
1663 	}
1664 }
1665