1 /*
2 * Small tool to check for dedupable blocks in a file or device. Basically
3 * just scans the filename for extents of the given size, checksums them,
4 * and orders them up.
5 */
6 #include <fcntl.h>
7 #include <inttypes.h>
8 #include <stdio.h>
9 #include <string.h>
10 #include <unistd.h>
11 #include <sys/stat.h>
12
13 #include "../fio.h"
14 #include "../flist.h"
15 #include "../log.h"
16 #include "../fio_sem.h"
17 #include "../smalloc.h"
18 #include "../minmax.h"
19 #include "../crc/md5.h"
20 #include "../os/os.h"
21 #include "../gettime.h"
22 #include "../fio_time.h"
23 #include "../lib/rbtree.h"
24
25 #include "../lib/bloom.h"
26 #include "debug.h"
27 #include "zlib.h"
28
29 struct zlib_ctrl {
30 z_stream stream;
31 unsigned char *buf_in;
32 unsigned char *buf_out;
33 };
34
35 struct worker_thread {
36 struct zlib_ctrl zc;
37 pthread_t thread;
38 uint64_t cur_offset;
39 uint64_t size;
40 unsigned long long unique_capacity;
41 unsigned long items;
42 unsigned long dupes;
43 int err;
44 int fd;
45 volatile int done;
46 };
47
48 struct extent {
49 struct flist_head list;
50 uint64_t offset;
51 };
52
53 struct chunk {
54 struct fio_rb_node rb_node;
55 uint64_t count;
56 uint32_t hash[MD5_HASH_WORDS];
57 struct flist_head extent_list[0];
58 };
59
60 struct item {
61 uint64_t offset;
62 uint32_t hash[MD5_HASH_WORDS];
63 };
64
65 static struct rb_root rb_root;
66 static struct bloom *bloom;
67 static struct fio_sem *rb_lock;
68
69 static unsigned int blocksize = 4096;
70 static unsigned int num_threads;
71 static unsigned int chunk_size = 1048576;
72 static unsigned int dump_output;
73 static unsigned int odirect;
74 static unsigned int collision_check;
75 static unsigned int print_progress = 1;
76 static unsigned int use_bloom = 1;
77 static unsigned int compression = 0;
78
79 static uint64_t total_size;
80 static uint64_t cur_offset;
81 static struct fio_sem *size_lock;
82
83 static struct fio_file file;
84
get_size(struct fio_file * f,struct stat * sb)85 static uint64_t get_size(struct fio_file *f, struct stat *sb)
86 {
87 uint64_t ret;
88
89 if (S_ISBLK(sb->st_mode)) {
90 unsigned long long bytes = 0;
91
92 if (blockdev_size(f, &bytes)) {
93 log_err("dedupe: failed getting bdev size\n");
94 return 0;
95 }
96 ret = bytes;
97 } else {
98 ret = sb->st_size;
99 }
100
101 return (ret & ~((uint64_t)blocksize - 1));
102 }
103
get_work(uint64_t * offset,uint64_t * size)104 static int get_work(uint64_t *offset, uint64_t *size)
105 {
106 uint64_t this_chunk;
107 int ret = 1;
108
109 fio_sem_down(size_lock);
110
111 if (cur_offset < total_size) {
112 *offset = cur_offset;
113 this_chunk = min((uint64_t)chunk_size, total_size - cur_offset);
114 *size = this_chunk;
115 cur_offset += this_chunk;
116 ret = 0;
117 }
118
119 fio_sem_up(size_lock);
120 return ret;
121 }
122
__read_block(int fd,void * buf,off_t offset,size_t count)123 static int __read_block(int fd, void *buf, off_t offset, size_t count)
124 {
125 ssize_t ret;
126
127 ret = pread(fd, buf, count, offset);
128 if (ret < 0) {
129 perror("pread");
130 return 1;
131 } else if (!ret) {
132 return 1;
133 } else if (ret != count) {
134 log_err("dedupe: short read on block\n");
135 return 1;
136 }
137
138 return 0;
139 }
140
read_block(int fd,void * buf,off_t offset)141 static int read_block(int fd, void *buf, off_t offset)
142 {
143 return __read_block(fd, buf, offset, blocksize);
144 }
145
account_unique_capacity(uint64_t offset,uint64_t * unique_capacity,struct zlib_ctrl * zc)146 static void account_unique_capacity(uint64_t offset, uint64_t *unique_capacity,
147 struct zlib_ctrl *zc)
148 {
149 z_stream *stream = &zc->stream;
150 unsigned int compressed_len;
151 int ret;
152
153 if (read_block(file.fd, zc->buf_in, offset))
154 return;
155
156 stream->next_in = zc->buf_in;
157 stream->avail_in = blocksize;
158 stream->avail_out = deflateBound(stream, blocksize);
159 stream->next_out = zc->buf_out;
160
161 ret = deflate(stream, Z_FINISH);
162 assert(ret != Z_STREAM_ERROR);
163 compressed_len = blocksize - stream->avail_out;
164
165 if (dump_output)
166 printf("offset 0x%lx compressed to %d blocksize %d ratio %.2f \n",
167 (unsigned long) offset, compressed_len, blocksize,
168 (float)compressed_len / (float)blocksize);
169
170 *unique_capacity += compressed_len;
171 deflateReset(stream);
172 }
173
add_item(struct chunk * c,struct item * i)174 static void add_item(struct chunk *c, struct item *i)
175 {
176 /*
177 * Save some memory and don't add extent items, if we don't
178 * use them.
179 */
180 if (dump_output || collision_check) {
181 struct extent *e;
182
183 e = malloc(sizeof(*e));
184 e->offset = i->offset;
185 flist_add_tail(&e->list, &c->extent_list[0]);
186 }
187
188 c->count++;
189 }
190
col_check(struct chunk * c,struct item * i)191 static int col_check(struct chunk *c, struct item *i)
192 {
193 struct extent *e;
194 char *cbuf, *ibuf;
195 int ret = 1;
196
197 cbuf = fio_memalign(blocksize, blocksize, false);
198 ibuf = fio_memalign(blocksize, blocksize, false);
199
200 e = flist_entry(c->extent_list[0].next, struct extent, list);
201 if (read_block(file.fd, cbuf, e->offset))
202 goto out;
203
204 if (read_block(file.fd, ibuf, i->offset))
205 goto out;
206
207 ret = memcmp(ibuf, cbuf, blocksize);
208 out:
209 fio_memfree(cbuf, blocksize, false);
210 fio_memfree(ibuf, blocksize, false);
211 return ret;
212 }
213
alloc_chunk(void)214 static struct chunk *alloc_chunk(void)
215 {
216 struct chunk *c;
217
218 if (collision_check || dump_output) {
219 c = malloc(sizeof(struct chunk) + sizeof(struct flist_head));
220 INIT_FLIST_HEAD(&c->extent_list[0]);
221 } else {
222 c = malloc(sizeof(struct chunk));
223 }
224
225 return c;
226 }
227
insert_chunk(struct item * i,uint64_t * unique_capacity,struct zlib_ctrl * zc)228 static void insert_chunk(struct item *i, uint64_t *unique_capacity,
229 struct zlib_ctrl *zc)
230 {
231 struct fio_rb_node **p, *parent;
232 struct chunk *c;
233 int diff;
234
235 p = &rb_root.rb_node;
236 parent = NULL;
237 while (*p) {
238 parent = *p;
239
240 c = rb_entry(parent, struct chunk, rb_node);
241 diff = memcmp(i->hash, c->hash, sizeof(i->hash));
242 if (diff < 0) {
243 p = &(*p)->rb_left;
244 } else if (diff > 0) {
245 p = &(*p)->rb_right;
246 } else {
247 int ret;
248
249 if (!collision_check)
250 goto add;
251
252 fio_sem_up(rb_lock);
253 ret = col_check(c, i);
254 fio_sem_down(rb_lock);
255
256 if (!ret)
257 goto add;
258
259 p = &(*p)->rb_right;
260 }
261 }
262
263 c = alloc_chunk();
264 RB_CLEAR_NODE(&c->rb_node);
265 c->count = 0;
266 memcpy(c->hash, i->hash, sizeof(i->hash));
267 rb_link_node(&c->rb_node, parent, p);
268 rb_insert_color(&c->rb_node, &rb_root);
269 if (compression)
270 account_unique_capacity(i->offset, unique_capacity, zc);
271 add:
272 add_item(c, i);
273 }
274
insert_chunks(struct item * items,unsigned int nitems,uint64_t * ndupes,uint64_t * unique_capacity,struct zlib_ctrl * zc)275 static void insert_chunks(struct item *items, unsigned int nitems,
276 uint64_t *ndupes, uint64_t *unique_capacity,
277 struct zlib_ctrl *zc)
278 {
279 int i;
280
281 fio_sem_down(rb_lock);
282
283 for (i = 0; i < nitems; i++) {
284 if (bloom) {
285 unsigned int s;
286 int r;
287
288 s = sizeof(items[i].hash) / sizeof(uint32_t);
289 r = bloom_set(bloom, items[i].hash, s);
290 *ndupes += r;
291 } else
292 insert_chunk(&items[i], unique_capacity, zc);
293 }
294
295 fio_sem_up(rb_lock);
296 }
297
crc_buf(void * buf,uint32_t * hash)298 static void crc_buf(void *buf, uint32_t *hash)
299 {
300 struct fio_md5_ctx ctx = { .hash = hash };
301
302 fio_md5_init(&ctx);
303 fio_md5_update(&ctx, buf, blocksize);
304 fio_md5_final(&ctx);
305 }
306
read_blocks(int fd,void * buf,off_t offset,size_t size)307 static unsigned int read_blocks(int fd, void *buf, off_t offset, size_t size)
308 {
309 if (__read_block(fd, buf, offset, size))
310 return 0;
311
312 return size / blocksize;
313 }
314
do_work(struct worker_thread * thread,void * buf)315 static int do_work(struct worker_thread *thread, void *buf)
316 {
317 unsigned int nblocks, i;
318 off_t offset;
319 int nitems = 0;
320 uint64_t ndupes = 0;
321 uint64_t unique_capacity = 0;
322 struct item *items;
323
324 offset = thread->cur_offset;
325
326 nblocks = read_blocks(thread->fd, buf, offset,
327 min(thread->size, (uint64_t) chunk_size));
328 if (!nblocks)
329 return 1;
330
331 items = malloc(sizeof(*items) * nblocks);
332
333 for (i = 0; i < nblocks; i++) {
334 void *thisptr = buf + (i * blocksize);
335
336 items[i].offset = offset;
337 crc_buf(thisptr, items[i].hash);
338 offset += blocksize;
339 nitems++;
340 }
341
342 insert_chunks(items, nitems, &ndupes, &unique_capacity, &thread->zc);
343
344 free(items);
345 thread->items += nitems;
346 thread->dupes += ndupes;
347 thread->unique_capacity += unique_capacity;
348 return 0;
349 }
350
thread_init_zlib_control(struct worker_thread * thread)351 static void thread_init_zlib_control(struct worker_thread *thread)
352 {
353 size_t sz;
354
355 z_stream *stream = &thread->zc.stream;
356 stream->zalloc = Z_NULL;
357 stream->zfree = Z_NULL;
358 stream->opaque = Z_NULL;
359
360 if (deflateInit(stream, Z_DEFAULT_COMPRESSION) != Z_OK)
361 return;
362
363 thread->zc.buf_in = fio_memalign(blocksize, blocksize, false);
364 sz = deflateBound(stream, blocksize);
365 thread->zc.buf_out = fio_memalign(blocksize, sz, false);
366 }
367
thread_fn(void * data)368 static void *thread_fn(void *data)
369 {
370 struct worker_thread *thread = data;
371 void *buf;
372
373 buf = fio_memalign(blocksize, chunk_size, false);
374 thread_init_zlib_control(thread);
375
376 do {
377 if (get_work(&thread->cur_offset, &thread->size)) {
378 thread->err = 1;
379 break;
380 }
381 if (do_work(thread, buf)) {
382 thread->err = 1;
383 break;
384 }
385 } while (1);
386
387 thread->done = 1;
388 fio_memfree(buf, chunk_size, false);
389 return NULL;
390 }
391
show_progress(struct worker_thread * threads,unsigned long total)392 static void show_progress(struct worker_thread *threads, unsigned long total)
393 {
394 unsigned long last_nitems = 0;
395 struct timespec last_tv;
396
397 fio_gettime(&last_tv, NULL);
398
399 while (print_progress) {
400 unsigned long this_items;
401 unsigned long nitems = 0;
402 uint64_t tdiff;
403 float perc;
404 int some_done = 0;
405 int i;
406
407 for (i = 0; i < num_threads; i++) {
408 nitems += threads[i].items;
409 some_done = threads[i].done;
410 if (some_done)
411 break;
412 }
413
414 if (some_done)
415 break;
416
417 perc = (float) nitems / (float) total;
418 perc *= 100.0;
419 this_items = nitems - last_nitems;
420 this_items *= blocksize;
421 tdiff = mtime_since_now(&last_tv);
422 if (tdiff) {
423 this_items = (this_items * 1000) / (tdiff * 1024);
424 printf("%3.2f%% done (%luKiB/sec)\r", perc, this_items);
425 last_nitems = nitems;
426 fio_gettime(&last_tv, NULL);
427 } else {
428 printf("%3.2f%% done\r", perc);
429 }
430 fflush(stdout);
431 usleep(250000);
432 };
433 }
434
run_dedupe_threads(struct fio_file * f,uint64_t dev_size,uint64_t * nextents,uint64_t * nchunks,uint64_t * unique_capacity)435 static int run_dedupe_threads(struct fio_file *f, uint64_t dev_size,
436 uint64_t *nextents, uint64_t *nchunks,
437 uint64_t *unique_capacity)
438 {
439 struct worker_thread *threads;
440 unsigned long nitems, total_items;
441 int i, err = 0;
442
443 total_size = dev_size;
444 total_items = dev_size / blocksize;
445 cur_offset = 0;
446 size_lock = fio_sem_init(FIO_SEM_UNLOCKED);
447
448 threads = malloc(num_threads * sizeof(struct worker_thread));
449 for (i = 0; i < num_threads; i++) {
450 memset(&threads[i], 0, sizeof(struct worker_thread));
451 threads[i].fd = f->fd;
452
453 err = pthread_create(&threads[i].thread, NULL, thread_fn, &threads[i]);
454 if (err) {
455 log_err("fio: thread startup failed\n");
456 break;
457 }
458 }
459
460 show_progress(threads, total_items);
461
462 nitems = 0;
463 *nextents = 0;
464 *nchunks = 1;
465 *unique_capacity = 0;
466 for (i = 0; i < num_threads; i++) {
467 void *ret;
468 pthread_join(threads[i].thread, &ret);
469 nitems += threads[i].items;
470 *nchunks += threads[i].dupes;
471 *unique_capacity += threads[i].unique_capacity;
472 }
473
474 printf("Threads(%u): %lu items processed\n", num_threads, nitems);
475
476 *nextents = nitems;
477 *nchunks = nitems - *nchunks;
478
479 fio_sem_remove(size_lock);
480 free(threads);
481 return err;
482 }
483
dedupe_check(const char * filename,uint64_t * nextents,uint64_t * nchunks,uint64_t * unique_capacity)484 static int dedupe_check(const char *filename, uint64_t *nextents,
485 uint64_t *nchunks, uint64_t *unique_capacity)
486 {
487 uint64_t dev_size;
488 struct stat sb;
489 int flags;
490
491 flags = O_RDONLY;
492 if (odirect)
493 flags |= OS_O_DIRECT;
494
495 memset(&file, 0, sizeof(file));
496 file.file_name = strdup(filename);
497
498 file.fd = open(filename, flags);
499 if (file.fd == -1) {
500 perror("open");
501 goto err;
502 }
503
504 if (fstat(file.fd, &sb) < 0) {
505 perror("fstat");
506 goto err;
507 }
508
509 dev_size = get_size(&file, &sb);
510 if (!dev_size)
511 goto err;
512
513 if (use_bloom) {
514 uint64_t bloom_entries;
515
516 bloom_entries = 8 * (dev_size / blocksize);
517 bloom = bloom_new(bloom_entries);
518 }
519
520 printf("Will check <%s>, size <%llu>, using %u threads\n", filename,
521 (unsigned long long) dev_size, num_threads);
522
523 return run_dedupe_threads(&file, dev_size, nextents, nchunks,
524 unique_capacity);
525 err:
526 if (file.fd != -1)
527 close(file.fd);
528 free(file.file_name);
529 return 1;
530 }
531
show_chunk(struct chunk * c)532 static void show_chunk(struct chunk *c)
533 {
534 struct flist_head *n;
535 struct extent *e;
536
537 printf("c hash %8x %8x %8x %8x, count %lu\n", c->hash[0], c->hash[1],
538 c->hash[2], c->hash[3], (unsigned long) c->count);
539 flist_for_each(n, &c->extent_list[0]) {
540 e = flist_entry(n, struct extent, list);
541 printf("\toffset %llu\n", (unsigned long long) e->offset);
542 }
543 }
544
545 static const char *capacity_unit[] = {"b","KB", "MB", "GB", "TB", "PB", "EB"};
546
bytes_to_human_readable_unit(uint64_t n,const char ** unit_out)547 static uint64_t bytes_to_human_readable_unit(uint64_t n, const char **unit_out)
548 {
549 uint8_t i = 0;
550
551 while (n >= 1024) {
552 i++;
553 n /= 1024;
554 }
555
556 *unit_out = capacity_unit[i];
557 return n;
558 }
559
show_stat(uint64_t nextents,uint64_t nchunks,uint64_t ndupextents,uint64_t unique_capacity)560 static void show_stat(uint64_t nextents, uint64_t nchunks, uint64_t ndupextents,
561 uint64_t unique_capacity)
562 {
563 double perc, ratio;
564 const char *unit;
565 uint64_t uc_human;
566
567 printf("Extents=%lu, Unique extents=%lu", (unsigned long) nextents,
568 (unsigned long) nchunks);
569 if (!bloom)
570 printf(" Duplicated extents=%lu", (unsigned long) ndupextents);
571 printf("\n");
572
573 if (nchunks) {
574 ratio = (double) nextents / (double) nchunks;
575 printf("De-dupe ratio: 1:%3.2f\n", ratio - 1.0);
576 } else {
577 printf("De-dupe ratio: 1:infinite\n");
578 }
579
580 if (ndupextents) {
581 printf("De-dupe working set at least: %3.2f%%\n",
582 100.0 * (double) ndupextents / (double) nextents);
583 }
584
585 perc = 1.00 - ((double) nchunks / (double) nextents);
586 perc *= 100.0;
587 printf("Fio setting: dedupe_percentage=%u\n", (int) (perc + 0.50));
588
589
590 if (compression) {
591 uc_human = bytes_to_human_readable_unit(unique_capacity, &unit);
592 printf("Unique capacity %lu%s\n", (unsigned long) uc_human, unit);
593 }
594 }
595
iter_rb_tree(uint64_t * nextents,uint64_t * nchunks,uint64_t * ndupextents)596 static void iter_rb_tree(uint64_t *nextents, uint64_t *nchunks, uint64_t *ndupextents)
597 {
598 struct fio_rb_node *n;
599 *nchunks = *nextents = *ndupextents = 0;
600
601 n = rb_first(&rb_root);
602 if (!n)
603 return;
604
605 do {
606 struct chunk *c;
607
608 c = rb_entry(n, struct chunk, rb_node);
609 (*nchunks)++;
610 *nextents += c->count;
611 *ndupextents += (c->count > 1);
612
613 if (dump_output)
614 show_chunk(c);
615
616 } while ((n = rb_next(n)) != NULL);
617 }
618
usage(char * argv[])619 static int usage(char *argv[])
620 {
621 log_err("Check for dedupable blocks on a device/file\n\n");
622 log_err("%s: [options] <device or file>\n", argv[0]);
623 log_err("\t-b\tChunk size to use\n");
624 log_err("\t-t\tNumber of threads to use\n");
625 log_err("\t-d\tFull extent/chunk debug output\n");
626 log_err("\t-o\tUse O_DIRECT\n");
627 log_err("\t-c\tFull collision check\n");
628 log_err("\t-B\tUse probabilistic bloom filter\n");
629 log_err("\t-p\tPrint progress indicator\n");
630 log_err("\t-C\tCalculate compressible size\n");
631 return 1;
632 }
633
main(int argc,char * argv[])634 int main(int argc, char *argv[])
635 {
636 uint64_t nextents = 0, nchunks = 0, ndupextents = 0, unique_capacity;
637 int c, ret;
638
639 arch_init(argv);
640 debug_init();
641
642 while ((c = getopt(argc, argv, "b:t:d:o:c:p:B:C:")) != -1) {
643 switch (c) {
644 case 'b':
645 blocksize = atoi(optarg);
646 break;
647 case 't':
648 num_threads = atoi(optarg);
649 break;
650 case 'd':
651 dump_output = atoi(optarg);
652 break;
653 case 'o':
654 odirect = atoi(optarg);
655 break;
656 case 'c':
657 collision_check = atoi(optarg);
658 break;
659 case 'p':
660 print_progress = atoi(optarg);
661 break;
662 case 'B':
663 use_bloom = atoi(optarg);
664 break;
665 case 'C':
666 compression = atoi(optarg);
667 break;
668 case '?':
669 default:
670 return usage(argv);
671 }
672 }
673
674 if (collision_check || dump_output || compression)
675 use_bloom = 0;
676
677 if (!num_threads)
678 num_threads = cpus_online();
679
680 if (argc == optind)
681 return usage(argv);
682
683 sinit();
684
685 rb_root = RB_ROOT;
686 rb_lock = fio_sem_init(FIO_SEM_UNLOCKED);
687
688 ret = dedupe_check(argv[optind], &nextents, &nchunks, &unique_capacity);
689
690 if (!ret) {
691 if (!bloom)
692 iter_rb_tree(&nextents, &nchunks, &ndupextents);
693
694 show_stat(nextents, nchunks, ndupextents, unique_capacity);
695 }
696
697 fio_sem_remove(rb_lock);
698 if (bloom)
699 bloom_free(bloom);
700 scleanup();
701 return ret;
702 }
703