1 /*	$NetBSD: functions.c,v 1.1.1.1 2009/12/02 00:27:10 haad Exp $	*/
2 
3 /*
4  * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
5  *
6  * This copyrighted material is made available to anyone wishing to use,
7  * modify, copy, or redistribute it subject to the terms and conditions
8  * of the GNU Lesser General Public License v.2.1.
9  *
10  * You should have received a copy of the GNU Lesser General Public License
11  * along with this program; if not, write to the Free Software Foundation,
12  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
13  */
14 #define _GNU_SOURCE
15 #define _FILE_OFFSET_BITS 64
16 
17 #include <stdint.h>
18 #include <errno.h>
19 #include <string.h>
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <dirent.h>
23 #include <unistd.h>
24 #include <signal.h>
25 #include <linux/kdev_t.h>
26 //#define __USE_GNU /* for O_DIRECT */
27 #include <fcntl.h>
28 #include <time.h>
29 #include "libdevmapper.h"
30 #include "dm-log-userspace.h"
31 #include "functions.h"
32 #include "common.h"
33 #include "cluster.h"
34 #include "logging.h"
35 
36 #define BYTE_SHIFT 3
37 
38 /*
39  * Magic for persistent mirrors: "MiRr"
40  * Following on-disk header information is stolen from
41  * drivers/md/dm-log.c
42  */
43 #define MIRROR_MAGIC 0x4D695272
44 #define MIRROR_DISK_VERSION 2
45 #define LOG_OFFSET 2
46 
47 #define RESYNC_HISTORY 50
48 //static char resync_history[RESYNC_HISTORY][128];
49 //static int idx = 0;
50 #define LOG_SPRINT(_lc, f, arg...) do {					\
51 		lc->idx++;						\
52 		lc->idx = lc->idx % RESYNC_HISTORY;			\
53 		sprintf(lc->resync_history[lc->idx], f, ## arg);	\
54 	} while (0)
55 
56 struct log_header {
57         uint32_t magic;
58         uint32_t version;
59         uint64_t nr_regions;
60 };
61 
62 struct log_c {
63 	struct dm_list list;
64 
65 	char uuid[DM_UUID_LEN];
66 	uint64_t luid;
67 
68 	time_t delay; /* limits how fast a resume can happen after suspend */
69 	int touched;
70 	uint32_t region_size;
71 	uint32_t region_count;
72 	uint64_t sync_count;
73 
74 	dm_bitset_t clean_bits;
75 	dm_bitset_t sync_bits;
76 	uint32_t recoverer;
77 	uint64_t recovering_region; /* -1 means not recovering */
78 	uint64_t skip_bit_warning; /* used to warn if region skipped */
79 	int sync_search;
80 
81 	int resume_override;
82 
83 	uint32_t block_on_error;
84         enum sync {
85                 DEFAULTSYNC,    /* Synchronize if necessary */
86                 NOSYNC,         /* Devices known to be already in sync */
87                 FORCESYNC,      /* Force a sync to happen */
88         } sync;
89 
90 	uint32_t state;         /* current operational state of the log */
91 
92 	struct dm_list mark_list;
93 
94 	uint32_t recovery_halted;
95 	struct recovery_request *recovery_request_list;
96 
97 	int disk_fd;            /* -1 means no disk log */
98 	int log_dev_failed;
99 	uint64_t disk_nr_regions;
100 	size_t disk_size;       /* size of disk_buffer in bytes */
101 	void *disk_buffer;      /* aligned memory for O_DIRECT */
102 	int idx;
103 	char resync_history[RESYNC_HISTORY][128];
104 };
105 
106 struct mark_entry {
107 	struct dm_list list;
108 	uint32_t nodeid;
109 	uint64_t region;
110 };
111 
112 struct recovery_request {
113 	uint64_t region;
114 	struct recovery_request *next;
115 };
116 
117 static DM_LIST_INIT(log_list);
118 static DM_LIST_INIT(log_pending_list);
119 
120 static int log_test_bit(dm_bitset_t bs, int bit)
121 {
122 	return dm_bit(bs, bit);
123 }
124 
125 static void log_set_bit(struct log_c *lc, dm_bitset_t bs, int bit)
126 {
127 	dm_bit_set(bs, bit);
128 	lc->touched = 1;
129 }
130 
131 static void log_clear_bit(struct log_c *lc, dm_bitset_t bs, int bit)
132 {
133 	dm_bit_clear(bs, bit);
134 	lc->touched = 1;
135 }
136 
137 static int find_next_zero_bit(dm_bitset_t bs, int start)
138 {
139 	while (dm_bit(bs, start++))
140 		if (start >= (int)bs[0])
141 			return -1;
142 
143 	return start - 1;
144 }
145 
146 static uint64_t count_bits32(dm_bitset_t bs)
147 {
148 	int i, size = ((int)bs[0]/DM_BITS_PER_INT + 1);
149 	unsigned count = 0;
150 
151 	for (i = 1; i <= size; i++)
152 		count += hweight32(bs[i]);
153 
154 	return (uint64_t)count;
155 }
156 
157 /*
158  * get_log
159  *
160  * Returns: log if found, NULL otherwise
161  */
162 static struct log_c *get_log(const char *uuid, uint64_t luid)
163 {
164 	struct log_c *lc;
165 
166 	dm_list_iterate_items(lc, &log_list)
167 		if (!strcmp(lc->uuid, uuid) &&
168 		    (!luid || (luid == lc->luid)))
169 			return lc;
170 
171 	return NULL;
172 }
173 
174 /*
175  * get_pending_log
176  *
177  * Pending logs are logs that have been 'clog_ctr'ed, but
178  * have not joined the CPG (via clog_resume).
179  *
180  * Returns: log if found, NULL otherwise
181  */
182 static struct log_c *get_pending_log(const char *uuid, uint64_t luid)
183 {
184 	struct log_c *lc;
185 
186 	dm_list_iterate_items(lc, &log_pending_list)
187 		if (!strcmp(lc->uuid, uuid) &&
188 		    (!luid || (luid == lc->luid)))
189 			return lc;
190 
191 	return NULL;
192 }
193 
194 static void header_to_disk(struct log_header *mem, struct log_header *disk)
195 {
196 	memcpy(disk, mem, sizeof(struct log_header));
197 }
198 
199 static void header_from_disk(struct log_header *mem, struct log_header *disk)
200 {
201 	memcpy(mem, disk, sizeof(struct log_header));
202 }
203 
204 static int rw_log(struct log_c *lc, int do_write)
205 {
206 	int r;
207 
208 	r = lseek(lc->disk_fd, 0, SEEK_SET);
209 	if (r < 0) {
210 		LOG_ERROR("[%s] rw_log:  lseek failure: %s",
211 			  SHORT_UUID(lc->uuid), strerror(errno));
212 		return -errno;
213 	}
214 
215 	if (do_write) {
216 		r = write(lc->disk_fd, lc->disk_buffer, lc->disk_size);
217 		if (r < 0) {
218 			LOG_ERROR("[%s] rw_log:  write failure: %s",
219 				  SHORT_UUID(lc->uuid), strerror(errno));
220 			return -EIO; /* Failed disk write */
221 		}
222 		return 0;
223 	}
224 
225 	/* Read */
226 	r = read(lc->disk_fd, lc->disk_buffer, lc->disk_size);
227 	if (r < 0)
228 		LOG_ERROR("[%s] rw_log:  read failure: %s",
229 			  SHORT_UUID(lc->uuid), strerror(errno));
230 	if (r != lc->disk_size)
231 		return -EIO; /* Failed disk read */
232 	return 0;
233 }
234 
235 /*
236  * read_log
237  * @lc
238  *
239  * Valid return codes:
240  *   -EINVAL:  Invalid header, bits not copied
241  *   -EIO:     Unable to read disk log
242  *    0:       Valid header, disk bit -> lc->clean_bits
243  *
244  * Returns: 0 on success, -EXXX on failure
245  */
246 static int read_log(struct log_c *lc)
247 {
248 	struct log_header lh;
249 	size_t bitset_size;
250 
251 	memset(&lh, 0, sizeof(struct log_header));
252 
253 	if (rw_log(lc, 0))
254 		return -EIO; /* Failed disk read */
255 
256 	header_from_disk(&lh, lc->disk_buffer);
257 	if (lh.magic != MIRROR_MAGIC)
258 		return -EINVAL;
259 
260 	lc->disk_nr_regions = lh.nr_regions;
261 
262 	/* Read disk bits into sync_bits */
263 	bitset_size = lc->region_count / 8;
264 	bitset_size += (lc->region_count % 8) ? 1 : 0;
265 	memcpy(lc->clean_bits, lc->disk_buffer + 1024, bitset_size);
266 
267 	return 0;
268 }
269 
270 /*
271  * write_log
272  * @lc
273  *
274  * Returns: 0 on success, -EIO on failure
275  */
276 static int write_log(struct log_c *lc)
277 {
278 	struct log_header lh;
279 	size_t bitset_size;
280 
281 	lh.magic = MIRROR_MAGIC;
282 	lh.version = MIRROR_DISK_VERSION;
283 	lh.nr_regions = lc->region_count;
284 
285 	header_to_disk(&lh, lc->disk_buffer);
286 
287 	/* Write disk bits from clean_bits */
288 	bitset_size = lc->region_count / 8;
289 	bitset_size += (lc->region_count % 8) ? 1 : 0;
290 	memcpy(lc->disk_buffer + 1024, lc->clean_bits, bitset_size);
291 
292 	if (rw_log(lc, 1)) {
293 		lc->log_dev_failed = 1;
294 		return -EIO; /* Failed disk write */
295 	}
296 	return 0;
297 }
298 
299 static int find_disk_path(char *major_minor_str, char *path_rtn, int *unlink_path)
300 {
301 	int r;
302 	DIR *dp;
303 	struct dirent *dep;
304 	struct stat statbuf;
305 	int major, minor;
306 
307 	if (!strstr(major_minor_str, ":")) {
308 		r = stat(major_minor_str, &statbuf);
309 		if (r)
310 			return -errno;
311 		if (!S_ISBLK(statbuf.st_mode))
312 			return -EINVAL;
313 		sprintf(path_rtn, "%s", major_minor_str);
314 		return 0;
315 	}
316 
317 	r = sscanf(major_minor_str, "%d:%d", &major, &minor);
318 	if (r != 2)
319 		return -EINVAL;
320 
321 	LOG_DBG("Checking /dev/mapper for device %d:%d", major, minor);
322 	/* Check /dev/mapper dir */
323 	dp = opendir("/dev/mapper");
324 	if (!dp)
325 		return -ENOENT;
326 
327 	while ((dep = readdir(dp)) != NULL) {
328 		/*
329 		 * FIXME: This is racy.  By the time the path is used,
330 		 * it may point to something else.  'fstat' will be
331 		 * required upon opening to ensure we got what we
332 		 * wanted.
333 		 */
334 
335 		sprintf(path_rtn, "/dev/mapper/%s", dep->d_name);
336 		stat(path_rtn, &statbuf);
337 		if (S_ISBLK(statbuf.st_mode) &&
338 		    (major(statbuf.st_rdev) == major) &&
339 		    (minor(statbuf.st_rdev) == minor)) {
340 			LOG_DBG("  %s: YES", dep->d_name);
341 			closedir(dp);
342 			return 0;
343 		} else {
344 			LOG_DBG("  %s: NO", dep->d_name);
345 		}
346 	}
347 
348 	closedir(dp);
349 
350 	LOG_DBG("Path not found for %d/%d", major, minor);
351 	LOG_DBG("Creating /dev/mapper/%d-%d", major, minor);
352 	sprintf(path_rtn, "/dev/mapper/%d-%d", major, minor);
353 	r = mknod(path_rtn, S_IFBLK | S_IRUSR | S_IWUSR, MKDEV(major, minor));
354 
355 	/*
356 	 * If we have to make the path, we unlink it after we open it
357 	 */
358 	*unlink_path = 1;
359 
360 	return r ? -errno : 0;
361 }
362 
363 static int _clog_ctr(char *uuid, uint64_t luid,
364 		     int argc, char **argv, uint64_t device_size)
365 {
366 	int i;
367 	int r = 0;
368 	char *p;
369 	uint64_t region_size;
370 	uint64_t region_count;
371 	struct log_c *lc = NULL;
372 	struct log_c *duplicate;
373 	enum sync sync = DEFAULTSYNC;
374 	uint32_t block_on_error = 0;
375 
376 	int disk_log = 0;
377 	char disk_path[128];
378 	int unlink_path = 0;
379 	size_t page_size;
380 	int pages;
381 
382 	/* If core log request, then argv[0] will be region_size */
383 	if (!strtoll(argv[0], &p, 0) || *p) {
384 		disk_log = 1;
385 
386 		if ((argc < 2) || (argc > 4)) {
387 			LOG_ERROR("Too %s arguments to clustered_disk log type",
388 				  (argc < 3) ? "few" : "many");
389 			r = -EINVAL;
390 			goto fail;
391 		}
392 
393 		r = find_disk_path(argv[0], disk_path, &unlink_path);
394 		if (r) {
395 			LOG_ERROR("Unable to find path to device %s", argv[0]);
396 			goto fail;
397 		}
398 		LOG_DBG("Clustered log disk is %s", disk_path);
399 	} else {
400 		disk_log = 0;
401 
402 		if ((argc < 1) || (argc > 3)) {
403 			LOG_ERROR("Too %s arguments to clustered_core log type",
404 				  (argc < 2) ? "few" : "many");
405 			r = -EINVAL;
406 			goto fail;
407 		}
408 	}
409 
410 	if (!(region_size = strtoll(argv[disk_log], &p, 0)) || *p) {
411 		LOG_ERROR("Invalid region_size argument to clustered_%s log type",
412 			  (disk_log) ? "disk" : "core");
413 		r = -EINVAL;
414 		goto fail;
415 	}
416 
417 	region_count = device_size / region_size;
418 	if (device_size % region_size) {
419 		/*
420 		 * I can't remember if device_size must be a multiple
421 		 * of region_size, so check it anyway.
422 		 */
423 		region_count++;
424 	}
425 
426 	for (i = 0; i < argc; i++) {
427 		if (!strcmp(argv[i], "sync"))
428 			sync = FORCESYNC;
429 		else if (!strcmp(argv[i], "nosync"))
430 			sync = NOSYNC;
431 		else if (!strcmp(argv[i], "block_on_error"))
432 			block_on_error = 1;
433 	}
434 
435 	lc = malloc(sizeof(*lc));
436 	if (!lc) {
437 		LOG_ERROR("Unable to allocate cluster log context");
438 		r = -ENOMEM;
439 		goto fail;
440 	}
441 	memset(lc, 0, sizeof(*lc));
442 
443 	lc->region_size = region_size;
444 	lc->region_count = region_count;
445 	lc->sync = sync;
446 	lc->block_on_error = block_on_error;
447 	lc->sync_search = 0;
448 	lc->recovering_region = (uint64_t)-1;
449 	lc->skip_bit_warning = region_count;
450 	lc->disk_fd = -1;
451 	lc->log_dev_failed = 0;
452 	strncpy(lc->uuid, uuid, DM_UUID_LEN);
453 	lc->luid = luid;
454 
455 	if ((duplicate = get_log(lc->uuid, lc->luid)) ||
456 	    (duplicate = get_pending_log(lc->uuid, lc->luid))) {
457 		LOG_ERROR("[%s/%llu] Log already exists, unable to create.",
458 			  SHORT_UUID(lc->uuid), lc->luid);
459 		free(lc);
460 		return -EINVAL;
461 	}
462 
463 	dm_list_init(&lc->mark_list);
464 
465 	lc->clean_bits = dm_bitset_create(NULL, region_count);
466 	if (!lc->clean_bits) {
467 		LOG_ERROR("Unable to allocate clean bitset");
468 		r = -ENOMEM;
469 		goto fail;
470 	}
471 
472 	lc->sync_bits = dm_bitset_create(NULL, region_count);
473 	if (!lc->sync_bits) {
474 		LOG_ERROR("Unable to allocate sync bitset");
475 		r = -ENOMEM;
476 		goto fail;
477 	}
478 	if (sync == NOSYNC)
479 		dm_bit_set_all(lc->sync_bits);
480 
481 	lc->sync_count = (sync == NOSYNC) ? region_count : 0;
482 	if (disk_log) {
483 		page_size = sysconf(_SC_PAGESIZE);
484 		pages = ((int)lc->clean_bits[0])/page_size;
485 		pages += ((int)lc->clean_bits[0])%page_size ? 1 : 0;
486 		pages += 1; /* for header */
487 
488 		r = open(disk_path, O_RDWR | O_DIRECT);
489 		if (r < 0) {
490 			LOG_ERROR("Unable to open log device, %s: %s",
491 				  disk_path, strerror(errno));
492 			r = errno;
493 			goto fail;
494 		}
495 		if (unlink_path)
496 			unlink(disk_path);
497 
498 		lc->disk_fd = r;
499 		lc->disk_size = pages * page_size;
500 
501 		r = posix_memalign(&(lc->disk_buffer), page_size,
502 				   lc->disk_size);
503 		if (r) {
504 			LOG_ERROR("Unable to allocate memory for disk_buffer");
505 			goto fail;
506 		}
507 		memset(lc->disk_buffer, 0, lc->disk_size);
508 		LOG_DBG("Disk log ready");
509 	}
510 
511 	dm_list_add(&log_pending_list, &lc->list);
512 
513 	return 0;
514 fail:
515 	if (lc) {
516 		if (lc->clean_bits)
517 			free(lc->clean_bits);
518 		if (lc->sync_bits)
519 			free(lc->sync_bits);
520 		if (lc->disk_buffer)
521 			free(lc->disk_buffer);
522 		if (lc->disk_fd >= 0)
523 			close(lc->disk_fd);
524 		free(lc);
525 	}
526 	return r;
527 }
528 
529 /*
530  * clog_ctr
531  * @rq
532  *
533  * rq->data should contain constructor string as follows:
534  *	<log_type> [disk] <region_size> [[no]sync] <device_len>
535  * The kernel is responsible for adding the <dev_len> argument
536  * to the end; otherwise, we cannot compute the region_count.
537  *
538  * FIXME: Currently relies on caller to fill in rq->error
539  */
540 static int clog_dtr(struct dm_ulog_request *rq);
541 static int clog_ctr(struct dm_ulog_request *rq)
542 {
543 	int argc, i, r = 0;
544 	char *p, **argv = NULL;
545 	char *dev_size_str;
546 	uint64_t device_size;
547 
548 	/* Sanity checks */
549 	if (!rq->data_size) {
550 		LOG_ERROR("Received constructor request with no data");
551 		return -EINVAL;
552 	}
553 
554 	if (strlen(rq->data) > rq->data_size) {
555 		LOG_ERROR("Received constructor request with bad data");
556 		LOG_ERROR("strlen(rq->data)[%d] != rq->data_size[%llu]",
557 			  (int)strlen(rq->data),
558 			  (unsigned long long)rq->data_size);
559 		LOG_ERROR("rq->data = '%s' [%d]",
560 			  rq->data, (int)strlen(rq->data));
561 		return -EINVAL;
562 	}
563 
564 	/* Split up args */
565 	for (argc = 0, p = rq->data; (p = strstr(p, " ")); p++, argc++)
566 		*p = '\0';
567 
568 	argv = malloc(argc * sizeof(char *));
569 	if (!argv)
570 		return -ENOMEM;
571 
572 	p = dev_size_str = rq->data;
573 	p += strlen(p) + 1;
574 	for (i = 0; i < argc; i++, p = p + strlen(p) + 1)
575 		argv[i] = p;
576 
577 	if (strcmp(argv[0], "clustered_disk") &&
578 	    strcmp(argv[0], "clustered_core")) {
579 		LOG_ERROR("Unsupported userspace log type, \"%s\"", argv[0]);
580 		free(argv);
581 		return -EINVAL;
582 	}
583 
584 	if (!(device_size = strtoll(dev_size_str, &p, 0)) || *p) {
585 		LOG_ERROR("Invalid device size argument: %s", dev_size_str);
586 		free(argv);
587 		return -EINVAL;
588 	}
589 
590 	r = _clog_ctr(rq->uuid, rq->luid, argc - 1, argv + 1, device_size);
591 
592 	/* We join the CPG when we resume */
593 
594 	/* No returning data */
595 	rq->data_size = 0;
596 
597 	if (r) {
598 		LOG_ERROR("Failed to create cluster log (%s)", rq->uuid);
599 		for (i = 0; i < argc; i++)
600 			LOG_ERROR("argv[%d] = %s", i, argv[i]);
601 	}
602 	else
603 		LOG_DBG("[%s] Cluster log created",
604 			SHORT_UUID(rq->uuid));
605 
606 	free(argv);
607 	return r;
608 }
609 
610 /*
611  * clog_dtr
612  * @rq
613  *
614  */
615 static int clog_dtr(struct dm_ulog_request *rq)
616 {
617 	struct log_c *lc = get_log(rq->uuid, rq->luid);
618 
619 	if (lc) {
620 		/*
621 		 * The log should not be on the official list.  There
622 		 * should have been a suspend first.
623 		 */
624 		LOG_ERROR("[%s] DTR before SUS: leaving CPG",
625 			  SHORT_UUID(rq->uuid));
626 		destroy_cluster_cpg(rq->uuid);
627 	} else if (!(lc = get_pending_log(rq->uuid, rq->luid))) {
628 		LOG_ERROR("clog_dtr called on log that is not official or pending");
629 		return -EINVAL;
630 	}
631 
632 	LOG_DBG("[%s] Cluster log removed", SHORT_UUID(lc->uuid));
633 
634 	dm_list_del(&lc->list);
635 	if (lc->disk_fd != -1)
636 		close(lc->disk_fd);
637 	if (lc->disk_buffer)
638 		free(lc->disk_buffer);
639 	free(lc->clean_bits);
640 	free(lc->sync_bits);
641 	free(lc);
642 
643 	return 0;
644 }
645 
646 /*
647  * clog_presuspend
648  * @rq
649  *
650  */
651 static int clog_presuspend(struct dm_ulog_request *rq)
652 {
653 	struct log_c *lc = get_log(rq->uuid, rq->luid);
654 
655 	if (!lc)
656 		return -EINVAL;
657 
658 	if (lc->touched)
659 		LOG_DBG("WARNING: log still marked as 'touched' during suspend");
660 
661 	lc->recovery_halted = 1;
662 
663 	return 0;
664 }
665 
666 /*
667  * clog_postsuspend
668  * @rq
669  *
670  */
671 static int clog_postsuspend(struct dm_ulog_request *rq)
672 {
673 	struct log_c *lc = get_log(rq->uuid, rq->luid);
674 
675 	if (!lc)
676 		return -EINVAL;
677 
678 	LOG_DBG("[%s] clog_postsuspend: leaving CPG", SHORT_UUID(lc->uuid));
679 	destroy_cluster_cpg(rq->uuid);
680 
681 	lc->state = LOG_SUSPENDED;
682 	lc->recovering_region = (uint64_t)-1;
683 	lc->recoverer = (uint32_t)-1;
684 	lc->delay = time(NULL);
685 
686 	return 0;
687 }
688 
689 /*
690  * cluster_postsuspend
691  * @rq
692  *
693  */
694 int cluster_postsuspend(char *uuid, uint64_t luid)
695 {
696 	struct log_c *lc = get_log(uuid, luid);
697 
698 	if (!lc)
699 		return -EINVAL;
700 
701 	LOG_DBG("[%s] clog_postsuspend: finalizing", SHORT_UUID(lc->uuid));
702 	lc->resume_override = 0;
703 
704 	/* move log to pending list */
705 	dm_list_del(&lc->list);
706 	dm_list_add(&log_pending_list, &lc->list);
707 
708 	return 0;
709 }
710 
711 /*
712  * clog_resume
713  * @rq
714  *
715  * Does the main work of resuming.
716  */
717 static int clog_resume(struct dm_ulog_request *rq)
718 {
719 	uint32_t i;
720 	int commit_log = 0;
721 	struct log_c *lc = get_log(rq->uuid, rq->luid);
722 
723 	if (!lc)
724 		return -EINVAL;
725 
726 	switch (lc->resume_override) {
727 	case 1000:
728 		LOG_ERROR("[%s] Additional resume issued before suspend",
729 			  SHORT_UUID(rq->uuid));
730 #ifdef DEBUG
731 		kill(getpid(), SIGUSR1);
732 #endif
733 		return 0;
734 	case 0:
735 		lc->resume_override = 1000;
736 		if (lc->disk_fd == -1) {
737 			LOG_DBG("[%s] Master resume.",
738 				SHORT_UUID(lc->uuid));
739 			goto no_disk;
740 		}
741 
742 		LOG_DBG("[%s] Master resume: reading disk log",
743 			SHORT_UUID(lc->uuid));
744 		commit_log = 1;
745 		break;
746 	case 1:
747 		LOG_ERROR("Error:: partial bit loading (just sync_bits)");
748 		return -EINVAL;
749 	case 2:
750 		LOG_ERROR("Error:: partial bit loading (just clean_bits)");
751 		return -EINVAL;
752 	case 3:
753 		LOG_DBG("[%s] Non-master resume: bits pre-loaded",
754 			SHORT_UUID(lc->uuid));
755 		lc->resume_override = 1000;
756 		goto out;
757 	default:
758 		LOG_ERROR("Error:: multiple loading of bits (%d)",
759 			  lc->resume_override);
760 		return -EINVAL;
761 	}
762 
763 	if (lc->log_dev_failed) {
764 		LOG_ERROR("Log device has failed, unable to read bits");
765 		rq->error = 0;  /* We can handle this so far */
766 		lc->disk_nr_regions = 0;
767 	} else
768 		rq->error = read_log(lc);
769 
770 	switch (rq->error) {
771 	case 0:
772 		if (lc->disk_nr_regions < lc->region_count)
773 			LOG_DBG("[%s] Mirror has grown, updating log bits",
774 				SHORT_UUID(lc->uuid));
775 		else if (lc->disk_nr_regions > lc->region_count)
776 			LOG_DBG("[%s] Mirror has shrunk, updating log bits",
777 				SHORT_UUID(lc->uuid));
778 		break;
779 	case -EINVAL:
780 		LOG_DBG("[%s] (Re)initializing mirror log - resync issued.",
781 			SHORT_UUID(lc->uuid));
782 		lc->disk_nr_regions = 0;
783 		break;
784 	default:
785 		LOG_ERROR("Failed to read disk log");
786 		lc->disk_nr_regions = 0;
787 		break;
788 	}
789 
790 no_disk:
791 	/* If mirror has grown, set bits appropriately */
792 	if (lc->sync == NOSYNC)
793 		for (i = lc->disk_nr_regions; i < lc->region_count; i++)
794 			log_set_bit(lc, lc->clean_bits, i);
795 	else
796 		for (i = lc->disk_nr_regions; i < lc->region_count; i++)
797 			log_clear_bit(lc, lc->clean_bits, i);
798 
799 	/* Clear any old bits if device has shrunk */
800 	for (i = lc->region_count; i % 32; i++)
801 		log_clear_bit(lc, lc->clean_bits, i);
802 
803 	/* copy clean across to sync */
804 	dm_bit_copy(lc->sync_bits, lc->clean_bits);
805 
806 	if (commit_log && (lc->disk_fd >= 0)) {
807 		rq->error = write_log(lc);
808 		if (rq->error)
809 			LOG_ERROR("Failed initial disk log write");
810 		else
811 			LOG_DBG("Disk log initialized");
812 		lc->touched = 0;
813 	}
814 out:
815 	/*
816 	 * Clear any old bits if device has shrunk - necessary
817 	 * for non-master resume
818 	 */
819 	for (i = lc->region_count; i % 32; i++) {
820 		log_clear_bit(lc, lc->clean_bits, i);
821 		log_clear_bit(lc, lc->sync_bits, i);
822 	}
823 
824 	lc->sync_count = count_bits32(lc->sync_bits);
825 
826 	LOG_SPRINT(lc, "[%s] Initial sync_count = %llu",
827 		   SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count);
828 	lc->sync_search = 0;
829 	lc->state = LOG_RESUMED;
830 	lc->recovery_halted = 0;
831 
832 	return rq->error;
833 }
834 
835 /*
836  * local_resume
837  * @rq
838  *
839  * If the log is pending, we must first join the cpg and
840  * put the log in the official list.
841  *
842  */
843 int local_resume(struct dm_ulog_request *rq)
844 {
845 	int r;
846 	time_t t;
847 	struct log_c *lc = get_log(rq->uuid, rq->luid);
848 
849 	if (!lc) {
850 		/* Is the log in the pending list? */
851 		lc = get_pending_log(rq->uuid, rq->luid);
852 		if (!lc) {
853 			LOG_ERROR("clog_resume called on log that is not official or pending");
854 			return -EINVAL;
855 		}
856 
857 		t = time(NULL);
858 		t -= lc->delay;
859 		/*
860 		 * This should be considered a temporary fix.  It addresses
861 		 * a problem that exists when nodes suspend/resume in rapid
862 		 * succession.  While the problem is very rare, it has been
863 		 * seen to happen in real-world-like testing.
864 		 *
865 		 * The problem:
866 		 * - Node A joins cluster
867 		 * - Node B joins cluster
868 		 * - Node A prepares checkpoint
869 		 * - Node A gets ready to write checkpoint
870 		 * - Node B leaves
871 		 * - Node B joins
872 		 * - Node A finishes write of checkpoint
873 		 * - Node B receives checkpoint meant for previous session
874 		 * -- Node B can now be non-coherent
875 		 *
876 		 * This timer will solve the problem for now, but could be
877 		 * replaced by a generation number sent with the resume
878 		 * command from the kernel.  The generation number would
879 		 * be included in the name of the checkpoint to prevent
880 		 * reading stale data.
881 		 */
882 		if ((t < 3) && (t >= 0))
883 			sleep(3 - t);
884 
885 		/* Join the CPG */
886 		r = create_cluster_cpg(rq->uuid, rq->luid);
887 		if (r) {
888 			LOG_ERROR("clog_resume:  Failed to create cluster CPG");
889 			return r;
890 		}
891 
892 		/* move log to official list */
893 		dm_list_del(&lc->list);
894 		dm_list_add(&log_list, &lc->list);
895 	}
896 
897 	return 0;
898 }
899 
900 /*
901  * clog_get_region_size
902  * @rq
903  *
904  * Since this value doesn't change, the kernel
905  * should not need to talk to server to get this
906  * The function is here for completness
907  *
908  * Returns: 0 on success, -EXXX on failure
909  */
910 static int clog_get_region_size(struct dm_ulog_request *rq)
911 {
912 	uint64_t *rtn = (uint64_t *)rq->data;
913 	struct log_c *lc = get_log(rq->uuid, rq->luid);
914 
915 	if (!lc && !(lc = get_pending_log(rq->uuid, rq->luid)))
916 		return -EINVAL;
917 
918 	*rtn = lc->region_size;
919 	rq->data_size = sizeof(*rtn);
920 
921 	return 0;
922 }
923 
924 /*
925  * clog_is_clean
926  * @rq
927  *
928  * Returns: 1 if clean, 0 otherwise
929  */
930 static int clog_is_clean(struct dm_ulog_request *rq)
931 {
932 	int64_t *rtn = (int64_t *)rq->data;
933 	uint64_t region = *((uint64_t *)(rq->data));
934 	struct log_c *lc = get_log(rq->uuid, rq->luid);
935 
936 	if (!lc)
937 		return -EINVAL;
938 
939 	*rtn = log_test_bit(lc->clean_bits, region);
940 	rq->data_size = sizeof(*rtn);
941 
942 	return 0;
943 }
944 
945 /*
946  * clog_in_sync
947  * @rq
948  *
949  * We ignore any request for non-block.  That
950  * should be handled elsewhere.  (If the request
951  * has come this far, it has already blocked.)
952  *
953  * Returns: 1 if in-sync, 0 otherwise
954  */
955 static int clog_in_sync(struct dm_ulog_request *rq)
956 {
957 	int64_t *rtn = (int64_t *)rq->data;
958 	uint64_t region = *((uint64_t *)(rq->data));
959 	struct log_c *lc = get_log(rq->uuid, rq->luid);
960 
961 	if (!lc)
962 		return -EINVAL;
963 
964 	if (region > lc->region_count)
965 		return -EINVAL;
966 
967 	*rtn = log_test_bit(lc->sync_bits, region);
968 	if (*rtn)
969 		LOG_DBG("[%s] Region is in-sync: %llu",
970 			SHORT_UUID(lc->uuid), (unsigned long long)region);
971 	else
972 		LOG_DBG("[%s] Region is not in-sync: %llu",
973 			SHORT_UUID(lc->uuid), (unsigned long long)region);
974 
975 	rq->data_size = sizeof(*rtn);
976 
977 	return 0;
978 }
979 
980 /*
981  * clog_flush
982  * @rq
983  *
984  */
985 static int clog_flush(struct dm_ulog_request *rq, int server)
986 {
987 	int r = 0;
988 	struct log_c *lc = get_log(rq->uuid, rq->luid);
989 
990 	if (!lc)
991 		return -EINVAL;
992 
993 	if (!lc->touched)
994 		return 0;
995 
996 	/*
997 	 * Do the actual flushing of the log only
998 	 * if we are the server.
999 	 */
1000 	if (server && (lc->disk_fd >= 0)) {
1001 		r = rq->error = write_log(lc);
1002 		if (r)
1003 			LOG_ERROR("[%s] Error writing to disk log",
1004 				  SHORT_UUID(lc->uuid));
1005 		else
1006 			LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid));
1007 	}
1008 
1009 	lc->touched = 0;
1010 
1011 	return r;
1012 
1013 }
1014 
1015 /*
1016  * mark_region
1017  * @lc
1018  * @region
1019  * @who
1020  *
1021  * Put a mark region request in the tree for tracking.
1022  *
1023  * Returns: 0 on success, -EXXX on error
1024  */
1025 static int mark_region(struct log_c *lc, uint64_t region, uint32_t who)
1026 {
1027 	int found = 0;
1028 	struct mark_entry *m;
1029 
1030 	dm_list_iterate_items(m, &lc->mark_list)
1031 		if (m->region == region) {
1032 			found = 1;
1033 			if (m->nodeid == who)
1034 				return 0;
1035 		}
1036 
1037 	if (!found)
1038 		log_clear_bit(lc, lc->clean_bits, region);
1039 
1040 	/*
1041 	 * Save allocation until here - if there is a failure,
1042 	 * at least we have cleared the bit.
1043 	 */
1044 	m = malloc(sizeof(*m));
1045 	if (!m) {
1046 		LOG_ERROR("Unable to allocate space for mark_entry: %llu/%u",
1047 			  (unsigned long long)region, who);
1048 		return -ENOMEM;
1049 	}
1050 
1051 	m->nodeid = who;
1052 	m->region = region;
1053 	dm_list_add(&lc->mark_list, &m->list);
1054 
1055 	return 0;
1056 }
1057 
1058 /*
1059  * clog_mark_region
1060  * @rq
1061  *
1062  * rq may contain more than one mark request.  We
1063  * can determine the number from the 'data_size' field.
1064  *
1065  * Returns: 0 on success, -EXXX on failure
1066  */
1067 static int clog_mark_region(struct dm_ulog_request *rq, uint32_t originator)
1068 {
1069 	int r;
1070 	int count;
1071 	uint64_t *region;
1072 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1073 
1074 	if (!lc)
1075 		return -EINVAL;
1076 
1077 	if (rq->data_size % sizeof(uint64_t)) {
1078 		LOG_ERROR("Bad data size given for mark_region request");
1079 		return -EINVAL;
1080 	}
1081 
1082 	count = rq->data_size / sizeof(uint64_t);
1083 	region = (uint64_t *)&rq->data;
1084 
1085 	for (; count > 0; count--, region++) {
1086 		r = mark_region(lc, *region, originator);
1087 		if (r)
1088 			return r;
1089 	}
1090 
1091 	rq->data_size = 0;
1092 
1093 	return 0;
1094 }
1095 
1096 static int clear_region(struct log_c *lc, uint64_t region, uint32_t who)
1097 {
1098 	int other_matches = 0;
1099 	struct mark_entry *m, *n;
1100 
1101 	dm_list_iterate_items_safe(m, n, &lc->mark_list)
1102 		if (m->region == region) {
1103 			if (m->nodeid == who) {
1104 				dm_list_del(&m->list);
1105 				free(m);
1106 			} else
1107 				other_matches = 1;
1108 		}
1109 
1110 	/*
1111 	 * Clear region if:
1112 	 *  1) It is in-sync
1113 	 *  2) There are no other machines that have it marked
1114 	 */
1115 	if (!other_matches && log_test_bit(lc->sync_bits, region))
1116 		log_set_bit(lc, lc->clean_bits, region);
1117 
1118 	return 0;
1119 }
1120 
1121 /*
1122  * clog_clear_region
1123  * @rq
1124  *
1125  * rq may contain more than one clear request.  We
1126  * can determine the number from the 'data_size' field.
1127  *
1128  * Returns: 0 on success, -EXXX on failure
1129  */
1130 static int clog_clear_region(struct dm_ulog_request *rq, uint32_t originator)
1131 {
1132 	int r;
1133 	int count;
1134 	uint64_t *region;
1135 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1136 
1137 	if (!lc)
1138 		return -EINVAL;
1139 
1140 	if (rq->data_size % sizeof(uint64_t)) {
1141 		LOG_ERROR("Bad data size given for clear_region request");
1142 		return -EINVAL;
1143 	}
1144 
1145 	count = rq->data_size / sizeof(uint64_t);
1146 	region = (uint64_t *)&rq->data;
1147 
1148 	for (; count > 0; count--, region++) {
1149 		r = clear_region(lc, *region, originator);
1150 		if (r)
1151 			return r;
1152 	}
1153 
1154 	rq->data_size = 0;
1155 
1156 	return 0;
1157 }
1158 
1159 /*
1160  * clog_get_resync_work
1161  * @rq
1162  *
1163  */
1164 static int clog_get_resync_work(struct dm_ulog_request *rq, uint32_t originator)
1165 {
1166 	struct {
1167 		int64_t i;
1168 		uint64_t r;
1169 	} *pkg = (void *)rq->data;
1170 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1171 
1172 	if (!lc)
1173 		return -EINVAL;
1174 
1175 	rq->data_size = sizeof(*pkg);
1176 	pkg->i = 0;
1177 
1178 	if (lc->sync_search >= lc->region_count) {
1179 		/*
1180 		 * FIXME: handle intermittent errors during recovery
1181 		 * by resetting sync_search... but not to many times.
1182 		 */
1183 		LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1184 			   "Recovery finished",
1185 			   rq->seq, SHORT_UUID(lc->uuid), originator);
1186 		return 0;
1187 	}
1188 
1189 	if (lc->recovering_region != (uint64_t)-1) {
1190 		if (lc->recoverer == originator) {
1191 			LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1192 				   "Re-requesting work (%llu)",
1193 				   rq->seq, SHORT_UUID(lc->uuid), originator,
1194 				   (unsigned long long)lc->recovering_region);
1195 			pkg->r = lc->recovering_region;
1196 			pkg->i = 1;
1197 			LOG_COND(log_resend_requests, "***** RE-REQUEST *****");
1198 		} else {
1199 			LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1200 				   "Someone already recovering (%llu)",
1201 				   rq->seq, SHORT_UUID(lc->uuid), originator,
1202 				   (unsigned long long)lc->recovering_region);
1203 		}
1204 
1205 		return 0;
1206 	}
1207 
1208 	while (lc->recovery_request_list) {
1209 		struct recovery_request *del;
1210 
1211 		del = lc->recovery_request_list;
1212 		lc->recovery_request_list = del->next;
1213 
1214 		pkg->r = del->region;
1215 		free(del);
1216 
1217 		if (!log_test_bit(lc->sync_bits, pkg->r)) {
1218 			LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1219 				   "Assigning priority resync work (%llu)",
1220 				   rq->seq, SHORT_UUID(lc->uuid), originator,
1221 				   (unsigned long long)pkg->r);
1222 			pkg->i = 1;
1223 			lc->recovering_region = pkg->r;
1224 			lc->recoverer = originator;
1225 			return 0;
1226 		}
1227 	}
1228 
1229 	pkg->r = find_next_zero_bit(lc->sync_bits,
1230 				    lc->sync_search);
1231 
1232 	if (pkg->r >= lc->region_count) {
1233 		LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1234 			   "Resync work complete.",
1235 			   rq->seq, SHORT_UUID(lc->uuid), originator);
1236 		return 0;
1237 	}
1238 
1239 	lc->sync_search = pkg->r + 1;
1240 
1241 	LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1242 		   "Assigning resync work (%llu)",
1243 		   rq->seq, SHORT_UUID(lc->uuid), originator,
1244 		   (unsigned long long)pkg->r);
1245 	pkg->i = 1;
1246 	lc->recovering_region = pkg->r;
1247 	lc->recoverer = originator;
1248 
1249 	return 0;
1250 }
1251 
1252 /*
1253  * clog_set_region_sync
1254  * @rq
1255  */
1256 static int clog_set_region_sync(struct dm_ulog_request *rq, uint32_t originator)
1257 {
1258 	struct {
1259 		uint64_t region;
1260 		int64_t in_sync;
1261 	} *pkg = (void *)rq->data;
1262 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1263 
1264 	if (!lc)
1265 		return -EINVAL;
1266 
1267 	lc->recovering_region = (uint64_t)-1;
1268 
1269 	if (pkg->in_sync) {
1270 		if (log_test_bit(lc->sync_bits, pkg->region)) {
1271 			LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1272 				   "Region already set (%llu)",
1273 				   rq->seq, SHORT_UUID(lc->uuid), originator,
1274 				   (unsigned long long)pkg->region);
1275 		} else {
1276 			log_set_bit(lc, lc->sync_bits, pkg->region);
1277 			lc->sync_count++;
1278 
1279 			/* The rest of this section is all for debugging */
1280 			LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1281 				   "Setting region (%llu)",
1282 				   rq->seq, SHORT_UUID(lc->uuid), originator,
1283 				   (unsigned long long)pkg->region);
1284 			if (pkg->region == lc->skip_bit_warning)
1285 				lc->skip_bit_warning = lc->region_count;
1286 
1287 			if (pkg->region > (lc->skip_bit_warning + 5)) {
1288 				LOG_ERROR("*** Region #%llu skipped during recovery ***",
1289 					  (unsigned long long)lc->skip_bit_warning);
1290 				lc->skip_bit_warning = lc->region_count;
1291 #ifdef DEBUG
1292 				kill(getpid(), SIGUSR1);
1293 #endif
1294 			}
1295 
1296 			if (!log_test_bit(lc->sync_bits,
1297 					  (pkg->region) ? pkg->region - 1 : 0)) {
1298 				LOG_SPRINT(lc, "*** Previous bit not set ***");
1299 				lc->skip_bit_warning = (pkg->region) ?
1300 					pkg->region - 1 : 0;
1301 			}
1302 		}
1303 	} else if (log_test_bit(lc->sync_bits, pkg->region)) {
1304 		lc->sync_count--;
1305 		log_clear_bit(lc, lc->sync_bits, pkg->region);
1306 		LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1307 			   "Unsetting region (%llu)",
1308 			   rq->seq, SHORT_UUID(lc->uuid), originator,
1309 			   (unsigned long long)pkg->region);
1310 	}
1311 
1312 	if (lc->sync_count != count_bits32(lc->sync_bits)) {
1313 		unsigned long long reset = count_bits32(lc->sync_bits);
1314 
1315 		LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1316 			   "sync_count(%llu) != bitmap count(%llu)",
1317 			   rq->seq, SHORT_UUID(lc->uuid), originator,
1318 			   (unsigned long long)lc->sync_count, reset);
1319 #ifdef DEBUG
1320 		kill(getpid(), SIGUSR1);
1321 #endif
1322 		lc->sync_count = reset;
1323 	}
1324 
1325 	if (lc->sync_count > lc->region_count)
1326 		LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: "
1327 			   "(lc->sync_count > lc->region_count) - this is bad",
1328 			   rq->seq, SHORT_UUID(lc->uuid), originator);
1329 
1330 	rq->data_size = 0;
1331 	return 0;
1332 }
1333 
1334 /*
1335  * clog_get_sync_count
1336  * @rq
1337  */
1338 static int clog_get_sync_count(struct dm_ulog_request *rq, uint32_t originator)
1339 {
1340 	uint64_t *sync_count = (uint64_t *)rq->data;
1341 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1342 
1343 	/*
1344 	 * FIXME: Mirror requires us to be able to ask for
1345 	 * the sync count while pending... but I don't like
1346 	 * it because other machines may not be suspended and
1347 	 * the stored value may not be accurate.
1348 	 */
1349 	if (!lc)
1350 		lc = get_pending_log(rq->uuid, rq->luid);
1351 
1352 	if (!lc)
1353 		return -EINVAL;
1354 
1355 	*sync_count = lc->sync_count;
1356 
1357 	rq->data_size = sizeof(*sync_count);
1358 
1359 	if (lc->sync_count != count_bits32(lc->sync_bits)) {
1360 		unsigned long long reset = count_bits32(lc->sync_bits);
1361 
1362 		LOG_SPRINT(lc, "get_sync_count - SEQ#=%u, UUID=%s, nodeid = %u:: "
1363 			   "sync_count(%llu) != bitmap count(%llu)",
1364 			   rq->seq, SHORT_UUID(lc->uuid), originator,
1365 			   (unsigned long long)lc->sync_count, reset);
1366 #ifdef DEBUG
1367 		kill(getpid(), SIGUSR1);
1368 #endif
1369 		lc->sync_count = reset;
1370 	}
1371 
1372 	return 0;
1373 }
1374 
1375 static int core_status_info(struct log_c *lc, struct dm_ulog_request *rq)
1376 {
1377 	char *data = (char *)rq->data;
1378 
1379 	rq->data_size = sprintf(data, "1 clustered_core");
1380 
1381 	return 0;
1382 }
1383 
1384 static int disk_status_info(struct log_c *lc, struct dm_ulog_request *rq)
1385 {
1386 	char *data = (char *)rq->data;
1387 	struct stat statbuf;
1388 
1389 	if(fstat(lc->disk_fd, &statbuf)) {
1390 		rq->error = -errno;
1391 		return -errno;
1392 	}
1393 
1394 	rq->data_size = sprintf(data, "3 clustered_disk %d:%d %c",
1395 				major(statbuf.st_rdev), minor(statbuf.st_rdev),
1396 				(lc->log_dev_failed) ? 'D' : 'A');
1397 
1398 	return 0;
1399 }
1400 
1401 /*
1402  * clog_status_info
1403  * @rq
1404  *
1405  */
1406 static int clog_status_info(struct dm_ulog_request *rq)
1407 {
1408 	int r;
1409 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1410 
1411 	if (!lc)
1412 		lc = get_pending_log(rq->uuid, rq->luid);
1413 
1414 	if (!lc)
1415 		return -EINVAL;
1416 
1417 	if (lc->disk_fd == -1)
1418 		r = core_status_info(lc, rq);
1419 	else
1420 		r = disk_status_info(lc, rq);
1421 
1422 	return r;
1423 }
1424 
1425 static int core_status_table(struct log_c *lc, struct dm_ulog_request *rq)
1426 {
1427 	char *data = (char *)rq->data;
1428 
1429 	rq->data_size = sprintf(data, "clustered_core %u %s%s ",
1430 				lc->region_size,
1431 				(lc->sync == DEFAULTSYNC) ? "" :
1432 				(lc->sync == NOSYNC) ? "nosync " : "sync ",
1433 				(lc->block_on_error) ? "block_on_error" : "");
1434 	return 0;
1435 }
1436 
1437 static int disk_status_table(struct log_c *lc, struct dm_ulog_request *rq)
1438 {
1439 	char *data = (char *)rq->data;
1440 	struct stat statbuf;
1441 
1442 	if(fstat(lc->disk_fd, &statbuf)) {
1443 		rq->error = -errno;
1444 		return -errno;
1445 	}
1446 
1447 	rq->data_size = sprintf(data, "clustered_disk %d:%d %u %s%s ",
1448 				major(statbuf.st_rdev), minor(statbuf.st_rdev),
1449 				lc->region_size,
1450 				(lc->sync == DEFAULTSYNC) ? "" :
1451 				(lc->sync == NOSYNC) ? "nosync " : "sync ",
1452 				(lc->block_on_error) ? "block_on_error" : "");
1453 	return 0;
1454 }
1455 
1456 /*
1457  * clog_status_table
1458  * @rq
1459  *
1460  */
1461 static int clog_status_table(struct dm_ulog_request *rq)
1462 {
1463 	int r;
1464 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1465 
1466 	if (!lc)
1467 		lc = get_pending_log(rq->uuid, rq->luid);
1468 
1469 	if (!lc)
1470 		return -EINVAL;
1471 
1472 	if (lc->disk_fd == -1)
1473 		r = core_status_table(lc, rq);
1474 	else
1475 		r = disk_status_table(lc, rq);
1476 
1477 	return r;
1478 }
1479 
1480 /*
1481  * clog_is_remote_recovering
1482  * @rq
1483  *
1484  */
1485 static int clog_is_remote_recovering(struct dm_ulog_request *rq)
1486 {
1487 	uint64_t region = *((uint64_t *)(rq->data));
1488 	struct {
1489 		int64_t is_recovering;
1490 		uint64_t in_sync_hint;
1491 	} *pkg = (void *)rq->data;
1492 	struct log_c *lc = get_log(rq->uuid, rq->luid);
1493 
1494 	if (!lc)
1495 		return -EINVAL;
1496 
1497 	if (region > lc->region_count)
1498 		return -EINVAL;
1499 
1500 	if (lc->recovery_halted) {
1501 		LOG_DBG("[%s] Recovery halted... [not remote recovering]: %llu",
1502 			SHORT_UUID(lc->uuid), (unsigned long long)region);
1503 		pkg->is_recovering = 0;
1504 		pkg->in_sync_hint = lc->region_count; /* none are recovering */
1505 	} else {
1506 		pkg->is_recovering = !log_test_bit(lc->sync_bits, region);
1507 
1508 		/*
1509 		 * Remember, 'lc->sync_search' is 1 plus the region
1510 		 * currently being recovered.  So, we must take off 1
1511 		 * to account for that; but only if 'sync_search > 1'.
1512 		 */
1513 		pkg->in_sync_hint = lc->sync_search ? (lc->sync_search - 1) : 0;
1514 		LOG_DBG("[%s] Region is %s: %llu",
1515 			SHORT_UUID(lc->uuid),
1516 			(region == lc->recovering_region) ?
1517 			"currently remote recovering" :
1518 			(pkg->is_recovering) ? "pending remote recovery" :
1519 			"not remote recovering", (unsigned long long)region);
1520 	}
1521 
1522 	if (pkg->is_recovering &&
1523 	    (region != lc->recovering_region)) {
1524 		struct recovery_request *rr;
1525 
1526 		/* Already in the list? */
1527 		for (rr = lc->recovery_request_list; rr; rr = rr->next)
1528 			if (rr->region == region)
1529 				goto out;
1530 
1531 		/* Failure to allocated simply means we can't prioritize it */
1532 		rr = malloc(sizeof(*rr));
1533 		if (!rr)
1534 			goto out;
1535 
1536 		LOG_DBG("[%s] Adding region to priority list: %llu",
1537 			SHORT_UUID(lc->uuid), (unsigned long long)region);
1538 		rr->region = region;
1539 		rr->next = lc->recovery_request_list;
1540 		lc->recovery_request_list = rr;
1541 	}
1542 
1543 out:
1544 
1545 	rq->data_size = sizeof(*pkg);
1546 
1547 	return 0;
1548 }
1549 
1550 
1551 /*
1552  * do_request
1553  * @rq: the request
1554  * @server: is this request performed by the server
1555  *
1556  * An inability to perform this function will return an error
1557  * from this function.  However, an inability to successfully
1558  * perform the request will fill in the 'rq->error' field.
1559  *
1560  * Returns: 0 on success, -EXXX on error
1561  */
1562 int do_request(struct clog_request *rq, int server)
1563 {
1564 	int r;
1565 
1566 	if (!rq)
1567 		return 0;
1568 
1569 	if (rq->u_rq.error)
1570 		LOG_DBG("Programmer error: rq struct has error set");
1571 
1572 	switch (rq->u_rq.request_type) {
1573 	case DM_ULOG_CTR:
1574 		r = clog_ctr(&rq->u_rq);
1575 		break;
1576 	case DM_ULOG_DTR:
1577 		r = clog_dtr(&rq->u_rq);
1578 		break;
1579 	case DM_ULOG_PRESUSPEND:
1580 		r = clog_presuspend(&rq->u_rq);
1581 		break;
1582 	case DM_ULOG_POSTSUSPEND:
1583 		r = clog_postsuspend(&rq->u_rq);
1584 		break;
1585 	case DM_ULOG_RESUME:
1586 		r = clog_resume(&rq->u_rq);
1587 		break;
1588 	case DM_ULOG_GET_REGION_SIZE:
1589 		r = clog_get_region_size(&rq->u_rq);
1590 		break;
1591 	case DM_ULOG_IS_CLEAN:
1592 		r = clog_is_clean(&rq->u_rq);
1593 		break;
1594 	case DM_ULOG_IN_SYNC:
1595 		r = clog_in_sync(&rq->u_rq);
1596 		break;
1597 	case DM_ULOG_FLUSH:
1598 		r = clog_flush(&rq->u_rq, server);
1599 		break;
1600 	case DM_ULOG_MARK_REGION:
1601 		r = clog_mark_region(&rq->u_rq, rq->originator);
1602 		break;
1603 	case DM_ULOG_CLEAR_REGION:
1604 		r = clog_clear_region(&rq->u_rq, rq->originator);
1605 		break;
1606 	case DM_ULOG_GET_RESYNC_WORK:
1607 		r = clog_get_resync_work(&rq->u_rq, rq->originator);
1608 		break;
1609 	case DM_ULOG_SET_REGION_SYNC:
1610 		r = clog_set_region_sync(&rq->u_rq, rq->originator);
1611 		break;
1612 	case DM_ULOG_GET_SYNC_COUNT:
1613 		r = clog_get_sync_count(&rq->u_rq, rq->originator);
1614 		break;
1615 	case DM_ULOG_STATUS_INFO:
1616 		r = clog_status_info(&rq->u_rq);
1617 		break;
1618 	case DM_ULOG_STATUS_TABLE:
1619 		r = clog_status_table(&rq->u_rq);
1620 		break;
1621 	case DM_ULOG_IS_REMOTE_RECOVERING:
1622 		r = clog_is_remote_recovering(&rq->u_rq);
1623 		break;
1624 	default:
1625 		LOG_ERROR("Unknown request");
1626 		r = rq->u_rq.error = -EINVAL;
1627 		break;
1628 	}
1629 
1630 	if (r && !rq->u_rq.error)
1631 		rq->u_rq.error = r;
1632 	else if (r != rq->u_rq.error)
1633 		LOG_DBG("Warning:  error from function != rq->u_rq.error");
1634 
1635 	if (rq->u_rq.error && rq->u_rq.data_size) {
1636 		/* Make sure I'm handling errors correctly above */
1637 		LOG_DBG("Programmer error: rq->u_rq.error && rq->u_rq.data_size");
1638 		rq->u_rq.data_size = 0;
1639 	}
1640 
1641 	return 0;
1642 }
1643 
1644 static void print_bits(char *buf, int size, int print)
1645 {
1646 	int i;
1647 	char outbuf[128];
1648 
1649 	memset(outbuf, 0, sizeof(outbuf));
1650 
1651 	for (i = 0; i < size; i++) {
1652 		if (!(i % 16)) {
1653 			if (outbuf[0] != '\0') {
1654 				if (print)
1655 					LOG_PRINT("%s", outbuf);
1656 				else
1657 					LOG_DBG("%s", outbuf);
1658 			}
1659 			memset(outbuf, 0, sizeof(outbuf));
1660 			sprintf(outbuf, "[%3d - %3d]", i, i+15);
1661 		}
1662 		sprintf(outbuf + strlen(outbuf), " %.2X", (unsigned char)buf[i]);
1663 	}
1664 	if (outbuf[0] != '\0') {
1665 		if (print)
1666 			LOG_PRINT("%s", outbuf);
1667 		else
1668 			LOG_DBG("%s", outbuf);
1669 	}
1670 }
1671 
1672 /* int store_bits(const char *uuid, const char *which, char **buf)*/
1673 int push_state(const char *uuid, uint64_t luid,
1674 	       const char *which, char **buf, uint32_t debug_who)
1675 {
1676 	int bitset_size;
1677 	struct log_c *lc;
1678 
1679 	if (*buf)
1680 		LOG_ERROR("store_bits: *buf != NULL");
1681 
1682 	lc = get_log(uuid, luid);
1683 	if (!lc) {
1684 		LOG_ERROR("store_bits: No log found for %s", uuid);
1685 		return -EINVAL;
1686 	}
1687 
1688 	if (!strcmp(which, "recovering_region")) {
1689 		*buf = malloc(64); /* easily handles the 2 written numbers */
1690 		if (!*buf)
1691 			return -ENOMEM;
1692 		sprintf(*buf, "%llu %u", (unsigned long long)lc->recovering_region,
1693 			lc->recoverer);
1694 
1695 		LOG_SPRINT(lc, "CKPT SEND - SEQ#=X, UUID=%s, nodeid = %u:: "
1696 			   "recovering_region=%llu, recoverer=%u, sync_count=%llu",
1697 			   SHORT_UUID(lc->uuid), debug_who,
1698 			   (unsigned long long)lc->recovering_region,
1699 			   lc->recoverer,
1700 			   (unsigned long long)count_bits32(lc->sync_bits));
1701 		return 64;
1702 	}
1703 
1704 	/* Size in 'int's */
1705 	bitset_size = ((int)lc->clean_bits[0]/DM_BITS_PER_INT) + 1;
1706 
1707 	/* Size in bytes */
1708 	bitset_size *= 4;
1709 
1710 	*buf = malloc(bitset_size);
1711 
1712 	if (!*buf) {
1713 		LOG_ERROR("store_bits: Unable to allocate memory");
1714 		return -ENOMEM;
1715 	}
1716 
1717 	if (!strncmp(which, "sync_bits", 9)) {
1718 		memcpy(*buf, lc->sync_bits + 1, bitset_size);
1719 		LOG_DBG("[%s] storing sync_bits (sync_count = %llu):",
1720 			SHORT_UUID(uuid), (unsigned long long)
1721 			count_bits32(lc->sync_bits));
1722 		print_bits(*buf, bitset_size, 0);
1723 	} else if (!strncmp(which, "clean_bits", 9)) {
1724 		memcpy(*buf, lc->clean_bits + 1, bitset_size);
1725 		LOG_DBG("[%s] storing clean_bits:", SHORT_UUID(lc->uuid));
1726 		print_bits(*buf, bitset_size, 0);
1727 	}
1728 
1729 	return bitset_size;
1730 }
1731 
1732 /*int load_bits(const char *uuid, const char *which, char *buf, int size)*/
1733 int pull_state(const char *uuid, uint64_t luid,
1734 	       const char *which, char *buf, int size)
1735 {
1736 	int bitset_size;
1737 	struct log_c *lc;
1738 
1739 	if (!buf)
1740 		LOG_ERROR("pull_state: buf == NULL");
1741 
1742 	lc = get_log(uuid, luid);
1743 	if (!lc) {
1744 		LOG_ERROR("pull_state: No log found for %s", uuid);
1745 		return -EINVAL;
1746 	}
1747 
1748 	if (!strncmp(which, "recovering_region", 17)) {
1749 		sscanf(buf, "%llu %u", (unsigned long long *)&lc->recovering_region,
1750 		       &lc->recoverer);
1751 		LOG_SPRINT(lc, "CKPT INIT - SEQ#=X, UUID=%s, nodeid = X:: "
1752 			   "recovering_region=%llu, recoverer=%u",
1753 			   SHORT_UUID(lc->uuid),
1754 			   (unsigned long long)lc->recovering_region, lc->recoverer);
1755 		return 0;
1756 	}
1757 
1758 	/* Size in 'int's */
1759 	bitset_size = ((int)lc->clean_bits[0]/DM_BITS_PER_INT) + 1;
1760 
1761 	/* Size in bytes */
1762 	bitset_size *= 4;
1763 
1764 	if (bitset_size != size) {
1765 		LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)",
1766 			  which, size, bitset_size);
1767 		return -EINVAL;
1768 	}
1769 
1770 	if (!strncmp(which, "sync_bits", 9)) {
1771 		lc->resume_override += 1;
1772 		memcpy(lc->sync_bits + 1, buf, bitset_size);
1773 		LOG_DBG("[%s] loading sync_bits (sync_count = %llu):",
1774 			SHORT_UUID(lc->uuid),(unsigned long long)
1775 			count_bits32(lc->sync_bits));
1776 		print_bits((char *)lc->sync_bits, bitset_size, 0);
1777 	} else if (!strncmp(which, "clean_bits", 9)) {
1778 		lc->resume_override += 2;
1779 		memcpy(lc->clean_bits + 1, buf, bitset_size);
1780 		LOG_DBG("[%s] loading clean_bits:", SHORT_UUID(lc->uuid));
1781 		print_bits((char *)lc->clean_bits, bitset_size, 0);
1782 	}
1783 
1784 	return 0;
1785 }
1786 
1787 int log_get_state(struct dm_ulog_request *rq)
1788 {
1789 	struct log_c *lc;
1790 
1791 	lc = get_log(rq->uuid, rq->luid);
1792 	if (!lc)
1793 		return -EINVAL;
1794 
1795 	return lc->state;
1796 }
1797 
1798 /*
1799  * log_status
1800  *
1801  * Returns: 1 if logs are still present, 0 otherwise
1802  */
1803 int log_status(void)
1804 {
1805 	if (!dm_list_empty(&log_list) || !dm_list_empty(&log_pending_list))
1806 		return 1;
1807 
1808 	return 0;
1809 }
1810 
1811 void log_debug(void)
1812 {
1813 	struct log_c *lc;
1814 	uint64_t r;
1815 	int i;
1816 
1817 	LOG_ERROR("");
1818 	LOG_ERROR("LOG COMPONENT DEBUGGING::");
1819 	LOG_ERROR("Official log list:");
1820 	LOG_ERROR("Pending log list:");
1821 	dm_list_iterate_items(lc, &log_pending_list) {
1822 		LOG_ERROR("%s", lc->uuid);
1823 		LOG_ERROR("sync_bits:");
1824 		print_bits((char *)lc->sync_bits, (int)lc->sync_bits[0], 1);
1825 		LOG_ERROR("clean_bits:");
1826 		print_bits((char *)lc->clean_bits, (int)lc->sync_bits[0], 1);
1827 	}
1828 
1829 	dm_list_iterate_items(lc, &log_list) {
1830 		LOG_ERROR("%s", lc->uuid);
1831 		LOG_ERROR("  recoverer        : %u", lc->recoverer);
1832 		LOG_ERROR("  recovering_region: %llu",
1833 			  (unsigned long long)lc->recovering_region);
1834 		LOG_ERROR("  recovery_halted  : %s", (lc->recovery_halted) ?
1835 			  "YES" : "NO");
1836 		LOG_ERROR("sync_bits:");
1837 		print_bits((char *)lc->sync_bits, (int)lc->sync_bits[0], 1);
1838 		LOG_ERROR("clean_bits:");
1839 		print_bits((char *)lc->clean_bits, (int)lc->sync_bits[0], 1);
1840 
1841 		LOG_ERROR("Validating %s::", SHORT_UUID(lc->uuid));
1842 		r = find_next_zero_bit(lc->sync_bits, 0);
1843 		LOG_ERROR("  lc->region_count = %llu",
1844 			  (unsigned long long)lc->region_count);
1845 		LOG_ERROR("  lc->sync_count = %llu",
1846 			  (unsigned long long)lc->sync_count);
1847 		LOG_ERROR("  next zero bit  = %llu",
1848 			  (unsigned long long)r);
1849 		if ((r > lc->region_count) ||
1850 		    ((r == lc->region_count) && (lc->sync_count > lc->region_count))) {
1851 			LOG_ERROR("ADJUSTING SYNC_COUNT");
1852 			lc->sync_count = lc->region_count;
1853 		}
1854 
1855 		LOG_ERROR("Resync request history:");
1856 		for (i = 0; i < RESYNC_HISTORY; i++) {
1857 			lc->idx++;
1858 			lc->idx = lc->idx % RESYNC_HISTORY;
1859 			if (lc->resync_history[lc->idx][0] == '\0')
1860 				continue;
1861 			LOG_ERROR("%d:%d) %s", i, lc->idx,
1862 				  lc->resync_history[lc->idx]);
1863 		}
1864 	}
1865 }
1866