1 /*
2  * Small tool to check for dedupable blocks in a file or device. Basically
3  * just scans the filename for extents of the given size, checksums them,
4  * and orders them up.
5  */
6 #include <fcntl.h>
7 #include <inttypes.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <sys/stat.h>
12 
13 #include "../fio.h"
14 #include "../flist.h"
15 #include "../log.h"
16 #include "../fio_sem.h"
17 #include "../smalloc.h"
18 #include "../minmax.h"
19 #include "../crc/md5.h"
20 #include "../os/os.h"
21 #include "../gettime.h"
22 #include "../fio_time.h"
23 #include "../lib/rbtree.h"
24 
25 #include "../lib/bloom.h"
26 #include "debug.h"
27 #include "zlib.h"
28 
29 struct zlib_ctrl {
30 	z_stream stream;
31 	unsigned char *buf_in;
32 	unsigned char *buf_out;
33 };
34 
35 struct worker_thread {
36 	struct zlib_ctrl zc;
37 	pthread_t thread;
38 	uint64_t cur_offset;
39 	uint64_t size;
40 	unsigned long long unique_capacity;
41 	unsigned long items;
42 	unsigned long dupes;
43 	int err;
44 	int fd;
45 	volatile int done;
46 };
47 
48 struct extent {
49 	struct flist_head list;
50 	uint64_t offset;
51 };
52 
53 struct chunk {
54 	struct fio_rb_node rb_node;
55 	uint64_t count;
56 	uint32_t hash[MD5_HASH_WORDS];
57 	struct flist_head extent_list[0];
58 };
59 
60 struct item {
61 	uint64_t offset;
62 	uint32_t hash[MD5_HASH_WORDS];
63 };
64 
65 static struct rb_root rb_root;
66 static struct bloom *bloom;
67 static struct fio_sem *rb_lock;
68 
69 static unsigned int blocksize = 4096;
70 static unsigned int num_threads;
71 static unsigned int chunk_size = 1048576;
72 static unsigned int dump_output;
73 static unsigned int odirect;
74 static unsigned int collision_check;
75 static unsigned int print_progress = 1;
76 static unsigned int use_bloom = 1;
77 static unsigned int compression = 0;
78 
79 static uint64_t total_size;
80 static uint64_t cur_offset;
81 static struct fio_sem *size_lock;
82 
83 static struct fio_file file;
84 
get_size(struct fio_file * f,struct stat * sb)85 static uint64_t get_size(struct fio_file *f, struct stat *sb)
86 {
87 	uint64_t ret;
88 
89 	if (S_ISBLK(sb->st_mode)) {
90 		unsigned long long bytes = 0;
91 
92 		if (blockdev_size(f, &bytes)) {
93 			log_err("dedupe: failed getting bdev size\n");
94 			return 0;
95 		}
96 		ret = bytes;
97 	} else {
98 		ret = sb->st_size;
99 	}
100 
101 	return (ret & ~((uint64_t)blocksize - 1));
102 }
103 
get_work(uint64_t * offset,uint64_t * size)104 static int get_work(uint64_t *offset, uint64_t *size)
105 {
106 	uint64_t this_chunk;
107 	int ret = 1;
108 
109 	fio_sem_down(size_lock);
110 
111 	if (cur_offset < total_size) {
112 		*offset = cur_offset;
113 		this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
114 		*size = this_chunk;
115 		cur_offset += this_chunk;
116 		ret = 0;
117 	}
118 
119 	fio_sem_up(size_lock);
120 	return ret;
121 }
122 
__read_block(int fd,void * buf,off_t offset,size_t count)123 static int __read_block(int fd, void *buf, off_t offset, size_t count)
124 {
125 	ssize_t ret;
126 
127 	ret = pread(fd, buf, count, offset);
128 	if (ret < 0) {
129 		perror("pread");
130 		return 1;
131 	} else if (!ret) {
132 		return 1;
133 	} else if (ret != count) {
134 		log_err("dedupe: short read on block\n");
135 		return 1;
136 	}
137 
138 	return 0;
139 }
140 
read_block(int fd,void * buf,off_t offset)141 static int read_block(int fd, void *buf, off_t offset)
142 {
143 	return __read_block(fd, buf, offset, blocksize);
144 }
145 
account_unique_capacity(uint64_t offset,uint64_t * unique_capacity,struct zlib_ctrl * zc)146 static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
147 				    struct zlib_ctrl *zc)
148 {
149 	z_stream *stream = &zc->stream;
150 	unsigned int compressed_len;
151 	int ret;
152 
153 	if (read_block(file.fd, zc->buf_in, offset))
154 		return;
155 
156 	stream->next_in = zc->buf_in;
157 	stream->avail_in = blocksize;
158 	stream->avail_out = deflateBound(stream, blocksize);
159 	stream->next_out = zc->buf_out;
160 
161 	ret = deflate(stream, Z_FINISH);
162 	assert(ret != Z_STREAM_ERROR);
163 	compressed_len = blocksize - stream->avail_out;
164 
165 	if (dump_output)
166 		printf("offset 0x%lx compressed to %d blocksize %d ratio %.2f \n",
167 				(unsigned long) offset, compressed_len, blocksize,
168 				(float)compressed_len / (float)blocksize);
169 
170 	*unique_capacity += compressed_len;
171 	deflateReset(stream);
172 }
173 
add_item(struct chunk * c,struct item * i)174 static void add_item(struct chunk *c, struct item *i)
175 {
176 	/*
177 	 * Save some memory and don't add extent items, if we don't
178 	 * use them.
179 	 */
180 	if (dump_output || collision_check) {
181 		struct extent *e;
182 
183 		e = malloc(sizeof(*e));
184 		e->offset = i->offset;
185 		flist_add_tail(&e->list, &c->extent_list[0]);
186 	}
187 
188 	c->count++;
189 }
190 
col_check(struct chunk * c,struct item * i)191 static int col_check(struct chunk *c, struct item *i)
192 {
193 	struct extent *e;
194 	char *cbuf, *ibuf;
195 	int ret = 1;
196 
197 	cbuf = fio_memalign(blocksize, blocksize, false);
198 	ibuf = fio_memalign(blocksize, blocksize, false);
199 
200 	e = flist_entry(c->extent_list[0].next, struct extent, list);
201 	if (read_block(file.fd, cbuf, e->offset))
202 		goto out;
203 
204 	if (read_block(file.fd, ibuf, i->offset))
205 		goto out;
206 
207 	ret = memcmp(ibuf, cbuf, blocksize);
208 out:
209 	fio_memfree(cbuf, blocksize, false);
210 	fio_memfree(ibuf, blocksize, false);
211 	return ret;
212 }
213 
alloc_chunk(void)214 static struct chunk *alloc_chunk(void)
215 {
216 	struct chunk *c;
217 
218 	if (collision_check || dump_output) {
219 		c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
220 		INIT_FLIST_HEAD(&c->extent_list[0]);
221 	} else {
222 		c = malloc(sizeof(struct chunk));
223 	}
224 
225 	return c;
226 }
227 
insert_chunk(struct item * i,uint64_t * unique_capacity,struct zlib_ctrl * zc)228 static void insert_chunk(struct item *i, uint64_t *unique_capacity,
229 			 struct zlib_ctrl *zc)
230 {
231 	struct fio_rb_node **p, *parent;
232 	struct chunk *c;
233 	int diff;
234 
235 	p = &rb_root.rb_node;
236 	parent = NULL;
237 	while (*p) {
238 		parent = *p;
239 
240 		c = rb_entry(parent, struct chunk, rb_node);
241 		diff = memcmp(i->hash, c->hash, sizeof(i->hash));
242 		if (diff < 0) {
243 			p = &(*p)->rb_left;
244 		} else if (diff > 0) {
245 			p = &(*p)->rb_right;
246 		} else {
247 			int ret;
248 
249 			if (!collision_check)
250 				goto add;
251 
252 			fio_sem_up(rb_lock);
253 			ret = col_check(c, i);
254 			fio_sem_down(rb_lock);
255 
256 			if (!ret)
257 				goto add;
258 
259 			p = &(*p)->rb_right;
260 		}
261 	}
262 
263 	c = alloc_chunk();
264 	RB_CLEAR_NODE(&c->rb_node);
265 	c->count = 0;
266 	memcpy(c->hash, i->hash, sizeof(i->hash));
267 	rb_link_node(&c->rb_node, parent, p);
268 	rb_insert_color(&c->rb_node, &rb_root);
269 	if (compression)
270 		account_unique_capacity(i->offset, unique_capacity, zc);
271 add:
272 	add_item(c, i);
273 }
274 
insert_chunks(struct item * items,unsigned int nitems,uint64_t * ndupes,uint64_t * unique_capacity,struct zlib_ctrl * zc)275 static void insert_chunks(struct item *items, unsigned int nitems,
276 			  uint64_t *ndupes, uint64_t *unique_capacity,
277 			  struct zlib_ctrl *zc)
278 {
279 	int i;
280 
281 	fio_sem_down(rb_lock);
282 
283 	for (i = 0; i < nitems; i++) {
284 		if (bloom) {
285 			unsigned int s;
286 			int r;
287 
288 			s = sizeof(items[i].hash) / sizeof(uint32_t);
289 			r = bloom_set(bloom, items[i].hash, s);
290 			*ndupes += r;
291 		} else
292 			insert_chunk(&items[i], unique_capacity, zc);
293 	}
294 
295 	fio_sem_up(rb_lock);
296 }
297 
crc_buf(void * buf,uint32_t * hash)298 static void crc_buf(void *buf, uint32_t *hash)
299 {
300 	struct fio_md5_ctx ctx = { .hash = hash };
301 
302 	fio_md5_init(&ctx);
303 	fio_md5_update(&ctx, buf, blocksize);
304 	fio_md5_final(&ctx);
305 }
306 
read_blocks(int fd,void * buf,off_t offset,size_t size)307 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
308 {
309 	if (__read_block(fd, buf, offset, size))
310 		return 0;
311 
312 	return size / blocksize;
313 }
314 
do_work(struct worker_thread * thread,void * buf)315 static int do_work(struct worker_thread *thread, void *buf)
316 {
317 	unsigned int nblocks, i;
318 	off_t offset;
319 	int nitems = 0;
320 	uint64_t ndupes = 0;
321 	uint64_t unique_capacity = 0;
322 	struct item *items;
323 
324 	offset = thread->cur_offset;
325 
326 	nblocks = read_blocks(thread->fd, buf, offset,
327 				min(thread->size, (uint64_t) chunk_size));
328 	if (!nblocks)
329 		return 1;
330 
331 	items = malloc(sizeof(*items) * nblocks);
332 
333 	for (i = 0; i < nblocks; i++) {
334 		void *thisptr = buf + (i * blocksize);
335 
336 		items[i].offset = offset;
337 		crc_buf(thisptr, items[i].hash);
338 		offset += blocksize;
339 		nitems++;
340 	}
341 
342 	insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
343 
344 	free(items);
345 	thread->items += nitems;
346 	thread->dupes += ndupes;
347 	thread->unique_capacity += unique_capacity;
348 	return 0;
349 }
350 
thread_init_zlib_control(struct worker_thread * thread)351 static void thread_init_zlib_control(struct worker_thread *thread)
352 {
353 	size_t sz;
354 
355 	z_stream *stream = &thread->zc.stream;
356 	stream->zalloc = Z_NULL;
357 	stream->zfree = Z_NULL;
358 	stream->opaque = Z_NULL;
359 
360 	if (deflateInit(stream, Z_DEFAULT_COMPRESSION) != Z_OK)
361 		return;
362 
363 	thread->zc.buf_in = fio_memalign(blocksize, blocksize, false);
364 	sz = deflateBound(stream, blocksize);
365 	thread->zc.buf_out = fio_memalign(blocksize, sz, false);
366 }
367 
thread_fn(void * data)368 static void *thread_fn(void *data)
369 {
370 	struct worker_thread *thread = data;
371 	void *buf;
372 
373 	buf = fio_memalign(blocksize, chunk_size, false);
374 	thread_init_zlib_control(thread);
375 
376 	do {
377 		if (get_work(&thread->cur_offset, &thread->size)) {
378 			thread->err = 1;
379 			break;
380 		}
381 		if (do_work(thread, buf)) {
382 			thread->err = 1;
383 			break;
384 		}
385 	} while (1);
386 
387 	thread->done = 1;
388 	fio_memfree(buf, chunk_size, false);
389 	return NULL;
390 }
391 
show_progress(struct worker_thread * threads,unsigned long total)392 static void show_progress(struct worker_thread *threads, unsigned long total)
393 {
394 	unsigned long last_nitems = 0;
395 	struct timespec last_tv;
396 
397 	fio_gettime(&last_tv, NULL);
398 
399 	while (print_progress) {
400 		unsigned long this_items;
401 		unsigned long nitems = 0;
402 		uint64_t tdiff;
403 		float perc;
404 		int some_done = 0;
405 		int i;
406 
407 		for (i = 0; i < num_threads; i++) {
408 			nitems += threads[i].items;
409 			some_done = threads[i].done;
410 			if (some_done)
411 				break;
412 		}
413 
414 		if (some_done)
415 			break;
416 
417 		perc = (float) nitems / (float) total;
418 		perc *= 100.0;
419 		this_items = nitems - last_nitems;
420 		this_items *= blocksize;
421 		tdiff = mtime_since_now(&last_tv);
422 		if (tdiff) {
423 			this_items = (this_items * 1000) / (tdiff * 1024);
424 			printf("%3.2f%% done (%luKiB/sec)\r", perc, this_items);
425 			last_nitems = nitems;
426 			fio_gettime(&last_tv, NULL);
427 		} else {
428 			printf("%3.2f%% done\r", perc);
429 		}
430 		fflush(stdout);
431 		usleep(250000);
432 	};
433 }
434 
run_dedupe_threads(struct fio_file * f,uint64_t dev_size,uint64_t * nextents,uint64_t * nchunks,uint64_t * unique_capacity)435 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
436 			      uint64_t *nextents, uint64_t *nchunks,
437 			      uint64_t *unique_capacity)
438 {
439 	struct worker_thread *threads;
440 	unsigned long nitems, total_items;
441 	int i, err = 0;
442 
443 	total_size = dev_size;
444 	total_items = dev_size / blocksize;
445 	cur_offset = 0;
446 	size_lock = fio_sem_init(FIO_SEM_UNLOCKED);
447 
448 	threads = malloc(num_threads * sizeof(struct worker_thread));
449 	for (i = 0; i < num_threads; i++) {
450 		memset(&threads[i], 0, sizeof(struct worker_thread));
451 		threads[i].fd = f->fd;
452 
453 		err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
454 		if (err) {
455 			log_err("fio: thread startup failed\n");
456 			break;
457 		}
458 	}
459 
460 	show_progress(threads, total_items);
461 
462 	nitems = 0;
463 	*nextents = 0;
464 	*nchunks = 1;
465 	*unique_capacity = 0;
466 	for (i = 0; i < num_threads; i++) {
467 		void *ret;
468 		pthread_join(threads[i].thread, &ret);
469 		nitems += threads[i].items;
470 		*nchunks += threads[i].dupes;
471 		*unique_capacity += threads[i].unique_capacity;
472 	}
473 
474 	printf("Threads(%u): %lu items processed\n", num_threads, nitems);
475 
476 	*nextents = nitems;
477 	*nchunks = nitems - *nchunks;
478 
479 	fio_sem_remove(size_lock);
480 	free(threads);
481 	return err;
482 }
483 
dedupe_check(const char * filename,uint64_t * nextents,uint64_t * nchunks,uint64_t * unique_capacity)484 static int dedupe_check(const char *filename, uint64_t *nextents,
485 			uint64_t *nchunks, uint64_t *unique_capacity)
486 {
487 	uint64_t dev_size;
488 	struct stat sb;
489 	int flags;
490 
491 	flags = O_RDONLY;
492 	if (odirect)
493 		flags |= OS_O_DIRECT;
494 
495 	memset(&file, 0, sizeof(file));
496 	file.file_name = strdup(filename);
497 
498 	file.fd = open(filename, flags);
499 	if (file.fd == -1) {
500 		perror("open");
501 		goto err;
502 	}
503 
504 	if (fstat(file.fd, &sb) < 0) {
505 		perror("fstat");
506 		goto err;
507 	}
508 
509 	dev_size = get_size(&file, &sb);
510 	if (!dev_size)
511 		goto err;
512 
513 	if (use_bloom) {
514 		uint64_t bloom_entries;
515 
516 		bloom_entries = 8 * (dev_size / blocksize);
517 		bloom = bloom_new(bloom_entries);
518 	}
519 
520 	printf("Will check <%s>, size <%llu>, using %u threads\n", filename,
521 				(unsigned long long) dev_size, num_threads);
522 
523 	return run_dedupe_threads(&file, dev_size, nextents, nchunks,
524 					unique_capacity);
525 err:
526 	if (file.fd != -1)
527 		close(file.fd);
528 	free(file.file_name);
529 	return 1;
530 }
531 
show_chunk(struct chunk * c)532 static void show_chunk(struct chunk *c)
533 {
534 	struct flist_head *n;
535 	struct extent *e;
536 
537 	printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1],
538 			c->hash[2], c->hash[3], (unsigned long) c->count);
539 	flist_for_each(n, &c->extent_list[0]) {
540 		e = flist_entry(n, struct extent, list);
541 		printf("\toffset %llu\n", (unsigned long long) e->offset);
542 	}
543 }
544 
545 static const char *capacity_unit[] = {"b","KB", "MB", "GB", "TB", "PB", "EB"};
546 
bytes_to_human_readable_unit(uint64_t n,const char ** unit_out)547 static uint64_t bytes_to_human_readable_unit(uint64_t n, const char **unit_out)
548 {
549 	uint8_t i = 0;
550 
551 	while (n >= 1024) {
552 		i++;
553 		n /= 1024;
554 	}
555 
556 	*unit_out = capacity_unit[i];
557 	return n;
558 }
559 
show_stat(uint64_t nextents,uint64_t nchunks,uint64_t ndupextents,uint64_t unique_capacity)560 static void show_stat(uint64_t nextents, uint64_t nchunks, uint64_t ndupextents,
561 		      uint64_t unique_capacity)
562 {
563 	double perc, ratio;
564 	const char *unit;
565 	uint64_t uc_human;
566 
567 	printf("Extents=%lu, Unique extents=%lu", (unsigned long) nextents,
568 						(unsigned long) nchunks);
569 	if (!bloom)
570 		printf(" Duplicated extents=%lu", (unsigned long) ndupextents);
571 	printf("\n");
572 
573 	if (nchunks) {
574 		ratio = (double) nextents / (double) nchunks;
575 		printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
576 	} else {
577 		printf("De-dupe ratio: 1:infinite\n");
578 	}
579 
580 	if (ndupextents) {
581 		printf("De-dupe working set at least: %3.2f%%\n",
582 			100.0 * (double) ndupextents / (double) nextents);
583 	}
584 
585 	perc = 1.00 - ((double) nchunks / (double) nextents);
586 	perc *= 100.0;
587 	printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
588 
589 
590 	if (compression) {
591 		uc_human = bytes_to_human_readable_unit(unique_capacity, &unit);
592 		printf("Unique capacity %lu%s\n", (unsigned long) uc_human, unit);
593 	}
594 }
595 
iter_rb_tree(uint64_t * nextents,uint64_t * nchunks,uint64_t * ndupextents)596 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks, uint64_t *ndupextents)
597 {
598 	struct fio_rb_node *n;
599 	*nchunks = *nextents = *ndupextents = 0;
600 
601 	n = rb_first(&rb_root);
602 	if (!n)
603 		return;
604 
605 	do {
606 		struct chunk *c;
607 
608 		c = rb_entry(n, struct chunk, rb_node);
609 		(*nchunks)++;
610 		*nextents += c->count;
611 		*ndupextents += (c->count > 1);
612 
613 		if (dump_output)
614 			show_chunk(c);
615 
616 	} while ((n = rb_next(n)) != NULL);
617 }
618 
usage(char * argv[])619 static int usage(char *argv[])
620 {
621 	log_err("Check for dedupable blocks on a device/file\n\n");
622 	log_err("%s: [options] <device or file>\n", argv[0]);
623 	log_err("\t-b\tChunk size to use\n");
624 	log_err("\t-t\tNumber of threads to use\n");
625 	log_err("\t-d\tFull extent/chunk debug output\n");
626 	log_err("\t-o\tUse O_DIRECT\n");
627 	log_err("\t-c\tFull collision check\n");
628 	log_err("\t-B\tUse probabilistic bloom filter\n");
629 	log_err("\t-p\tPrint progress indicator\n");
630 	log_err("\t-C\tCalculate compressible size\n");
631 	return 1;
632 }
633 
main(int argc,char * argv[])634 int main(int argc, char *argv[])
635 {
636 	uint64_t nextents = 0, nchunks = 0, ndupextents = 0, unique_capacity;
637 	int c, ret;
638 
639 	arch_init(argv);
640 	debug_init();
641 
642 	while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:C:")) != -1) {
643 		switch (c) {
644 		case 'b':
645 			blocksize = atoi(optarg);
646 			break;
647 		case 't':
648 			num_threads = atoi(optarg);
649 			break;
650 		case 'd':
651 			dump_output = atoi(optarg);
652 			break;
653 		case 'o':
654 			odirect = atoi(optarg);
655 			break;
656 		case 'c':
657 			collision_check = atoi(optarg);
658 			break;
659 		case 'p':
660 			print_progress = atoi(optarg);
661 			break;
662 		case 'B':
663 			use_bloom = atoi(optarg);
664 			break;
665 		case 'C':
666 			compression = atoi(optarg);
667 			break;
668 		case '?':
669 		default:
670 			return usage(argv);
671 		}
672 	}
673 
674 	if (collision_check || dump_output || compression)
675 		use_bloom = 0;
676 
677 	if (!num_threads)
678 		num_threads = cpus_online();
679 
680 	if (argc == optind)
681 		return usage(argv);
682 
683 	sinit();
684 
685 	rb_root = RB_ROOT;
686 	rb_lock = fio_sem_init(FIO_SEM_UNLOCKED);
687 
688 	ret = dedupe_check(argv[optind], &nextents, &nchunks, &unique_capacity);
689 
690 	if (!ret) {
691 		if (!bloom)
692 			iter_rb_tree(&nextents, &nchunks, &ndupextents);
693 
694 		show_stat(nextents, nchunks, ndupextents, unique_capacity);
695 	}
696 
697 	fio_sem_remove(rb_lock);
698 	if (bloom)
699 		bloom_free(bloom);
700 	scleanup();
701 	return ret;
702 }
703