1 /* This file is part of Jellyfish.
2
3 This work is dual-licensed under 3-Clause BSD License or GPL 3.0.
4 You can choose between one of them if you use this work.
5
6 `SPDX-License-Identifier: BSD-3-Clause OR GPL-3.0`
7 */
8
9 #include <jellyfish/merge_files.hpp>
10
11 #include <iostream>
12 #include <fstream>
13 #include <vector>
14 #include <memory>
15 #include <string>
16
17 #include <jellyfish/err.hpp>
18 #include <jellyfish/misc.hpp>
19 #include <jellyfish/mer_heap.hpp>
20 #include <jellyfish/jellyfish.hpp>
21 #include <jellyfish/rectangular_binary_matrix.hpp>
22 #include <jellyfish/cpp_array.hpp>
23
24 namespace err = jellyfish::err;
25
26 using jellyfish::file_header;
27 using jellyfish::RectangularBinaryMatrix;
28 using jellyfish::mer_dna;
29 using jellyfish::cpp_array;
30 typedef std::unique_ptr<binary_reader> binary_reader_ptr;
31 typedef std::unique_ptr<text_reader> text_reader_ptr;
32
33 struct file_info {
34 std::ifstream is;
35 file_header header;
36
file_infofile_info37 file_info(const char* path) :
38 is(path),
39 header(is)
40 { }
41 };
42 typedef std::unique_ptr<RectangularBinaryMatrix> matrix_ptr;
43
44 template<typename reader_type, typename writer_type>
do_merge(cpp_array<file_info> & files,std::ostream & out,writer_type & writer,uint64_t min,uint64_t max,merge_op op)45 void do_merge(cpp_array<file_info>& files, std::ostream& out, writer_type& writer,
46 uint64_t min, uint64_t max, merge_op op) {
47 cpp_array<reader_type> readers(files.size());
48 typedef jellyfish::mer_heap::heap<mer_dna, reader_type> heap_type;
49 typedef typename heap_type::const_item_t heap_item;
50 const uint64_t nb_files = files.size();
51 heap_type heap(files.size());
52
53 for(size_t i = 0; i < files.size(); ++i) {
54 readers.init(i, files[i].is, &files[i].header);
55 if(readers[i].next())
56 heap.push(readers[i]);
57 }
58
59 uint64_t inter = 0, winter = 0, union_ = 0, wunion = 0;
60 heap_item head = heap.head();
61 mer_dna key;
62 while(heap.is_not_empty()) {
63 key = head->key_;
64 uint64_t sum = 0;
65 uint64_t maxc = 0;
66 uint64_t minc = std::numeric_limits<uint64_t>::max();
67 uint64_t files_present = 0;
68 do {
69 ++files_present;
70 sum += head->val_;
71 minc = std::min(minc, head->val_);
72 maxc = std::max(maxc, head->val_);
73 heap.pop();
74 if(head->it_->next())
75 heap.push(*head->it_);
76 head = heap.head();
77 } while(head->key_ == key && heap.is_not_empty());
78 if(files_present < nb_files) // Not present in some file -> count assumed 0
79 minc = 0;
80 if(op != JACCARD) {
81 uint64_t val = 0;
82 switch(op) {
83 case SUM: val = sum; break;
84 case MIN: val = minc; break;
85 case MAX: val = maxc; break;
86 default: break;
87 }
88 if(val >= min && val <= max)
89 writer.write(out, key, val);
90 } else {
91 inter += minc > 0;
92 winter += minc;
93 union_ += 1;
94 wunion += maxc;
95 }
96 }
97
98 if(op == JACCARD) {
99 out << "Jaccard " << (double)inter / (double)union_ << '\n'
100 << "wJaccard " << (double)winter / (double)wunion << '\n';
101 }
102 }
103
104 // Merge files. Throws an error if unsuccessful.
merge_files(std::vector<const char * > input_files,const char * out_file,file_header & out_header,uint64_t min,uint64_t max,merge_op op)105 void merge_files(std::vector<const char*> input_files,
106 const char* out_file,
107 file_header& out_header,
108 uint64_t min, uint64_t max,
109 merge_op op) {
110 unsigned int key_len = 0;
111 size_t max_reprobe_offset = 0;
112 size_t size = 0;
113 unsigned int out_counter_len = std::numeric_limits<unsigned int>::max();
114 std::string format;
115 matrix_ptr matrix;
116
117 cpp_array<file_info> files(input_files.size());
118
119 // create an iterator for each hash file
120 for(size_t i = 0; i < files.size(); i++) {
121 files.init(i, input_files[i]);
122 if(!files[i].is.good())
123 throw MergeError(err::msg() << "Failed to open input file '" << input_files[i] << "'");
124
125 file_header& h = files[i].header;
126 if(i == 0) {
127 key_len = h.key_len();
128 max_reprobe_offset = h.max_reprobe_offset();
129 size = h.size();
130 matrix.reset(new RectangularBinaryMatrix(h.matrix()));
131 out_header.size(size);
132 out_header.key_len(key_len);
133 format = h.format();
134 out_header.matrix(*matrix);
135 out_header.max_reprobe(h.max_reprobe());
136 size_t reprobes[h.max_reprobe() + 1];
137 h.get_reprobes(reprobes);
138 out_header.set_reprobes(reprobes);
139 out_counter_len = std::min(out_counter_len, h.counter_len());
140 } else {
141 if(format != h.format())
142 throw MergeError(err::msg() << "Can't merge files with different formats (" << format << ", " << h.format() << ")");
143 if(h.key_len() != key_len)
144 throw MergeError(err::msg() << "Can't merge hashes of different key lengths (" << key_len << ", " << h.key_len() << ")");
145 if(h.max_reprobe_offset() != max_reprobe_offset)
146 throw MergeError("Can't merge hashes with different reprobing strategies");
147 if(h.size() != size)
148 throw MergeError(err::msg() << "Can't merge hash with different size (" << size << ", " << h.size() << ")");
149 if(h.matrix() != *matrix)
150 throw MergeError("Can't merge hash with different hash function");
151 }
152 }
153 mer_dna::k(key_len / 2);
154
155 std::ofstream out;
156 out.open(out_file);
157 if(!out.good())
158 throw MergeError(err::msg() << "Can't open out file '" << out_file << "'");
159 if(op != JACCARD)
160 out_header.format(format);
161
162 if(!format.compare(binary_dumper::format)) {
163 out_header.counter_len(out_counter_len);
164 if(op != JACCARD)
165 out_header.write(out);
166 binary_writer writer(out_counter_len, key_len);
167 do_merge<binary_reader, binary_writer>(files, out, writer, min, max, op);
168 } else if(!format.compare(text_dumper::format)) {
169 if(op != JACCARD)
170 out_header.write(out);
171 text_writer writer;
172 do_merge<text_reader, text_writer>(files, out, writer, min, max, op);
173 } else {
174 throw MergeError(err::msg() << "Unknown format '" << format << "'");
175 }
176 }
177