1 /*  This file is part of Jellyfish.
2 
3     This work is dual-licensed under 3-Clause BSD License or GPL 3.0.
4     You can choose between one of them if you use this work.
5 
6 `SPDX-License-Identifier: BSD-3-Clause OR  GPL-3.0`
7 */
8 
9 #include <jellyfish/merge_files.hpp>
10 
11 #include <iostream>
12 #include <fstream>
13 #include <vector>
14 #include <memory>
15 #include <string>
16 
17 #include <jellyfish/err.hpp>
18 #include <jellyfish/misc.hpp>
19 #include <jellyfish/mer_heap.hpp>
20 #include <jellyfish/jellyfish.hpp>
21 #include <jellyfish/rectangular_binary_matrix.hpp>
22 #include <jellyfish/cpp_array.hpp>
23 
24 namespace err = jellyfish::err;
25 
26 using jellyfish::file_header;
27 using jellyfish::RectangularBinaryMatrix;
28 using jellyfish::mer_dna;
29 using jellyfish::cpp_array;
30 typedef std::unique_ptr<binary_reader> binary_reader_ptr;
31 typedef std::unique_ptr<text_reader> text_reader_ptr;
32 
33 struct file_info {
34   std::ifstream is;
35   file_header   header;
36 
file_infofile_info37   file_info(const char* path) :
38   is(path),
39   header(is)
40   { }
41 };
42 typedef std::unique_ptr<RectangularBinaryMatrix> matrix_ptr;
43 
44 template<typename reader_type, typename writer_type>
do_merge(cpp_array<file_info> & files,std::ostream & out,writer_type & writer,uint64_t min,uint64_t max,merge_op op)45 void do_merge(cpp_array<file_info>& files, std::ostream& out, writer_type& writer,
46               uint64_t min, uint64_t max, merge_op op) {
47   cpp_array<reader_type> readers(files.size());
48   typedef jellyfish::mer_heap::heap<mer_dna, reader_type> heap_type;
49   typedef typename heap_type::const_item_t heap_item;
50   const uint64_t nb_files = files.size();
51   heap_type heap(files.size());
52 
53   for(size_t i = 0; i < files.size(); ++i) {
54     readers.init(i, files[i].is, &files[i].header);
55     if(readers[i].next())
56       heap.push(readers[i]);
57   }
58 
59   uint64_t inter = 0, winter = 0, union_ = 0, wunion = 0;
60   heap_item head = heap.head();
61   mer_dna   key;
62   while(heap.is_not_empty()) {
63     key = head->key_;
64     uint64_t sum = 0;
65     uint64_t maxc = 0;
66     uint64_t minc = std::numeric_limits<uint64_t>::max();
67     uint64_t files_present = 0;
68     do {
69       ++files_present;
70       sum += head->val_;
71       minc  = std::min(minc, head->val_);
72       maxc  = std::max(maxc, head->val_);
73       heap.pop();
74       if(head->it_->next())
75         heap.push(*head->it_);
76       head = heap.head();
77     } while(head->key_ == key && heap.is_not_empty());
78     if(files_present < nb_files) // Not present in some file -> count assumed 0
79       minc = 0;
80     if(op != JACCARD) {
81       uint64_t val = 0;
82       switch(op) {
83       case SUM: val = sum; break;
84       case MIN: val = minc; break;
85       case MAX: val = maxc; break;
86       default: break;
87       }
88       if(val >= min && val <= max)
89         writer.write(out, key, val);
90     } else {
91       inter  += minc > 0;
92       winter += minc;
93       union_ += 1;
94       wunion += maxc;
95     }
96   }
97 
98   if(op == JACCARD) {
99     out << "Jaccard  " << (double)inter / (double)union_ << '\n'
100         << "wJaccard " << (double)winter / (double)wunion << '\n';
101   }
102 }
103 
104 // Merge files. Throws an error if unsuccessful.
merge_files(std::vector<const char * > input_files,const char * out_file,file_header & out_header,uint64_t min,uint64_t max,merge_op op)105 void merge_files(std::vector<const char*> input_files,
106                  const char* out_file,
107                  file_header& out_header,
108                  uint64_t min, uint64_t max,
109                  merge_op op) {
110   unsigned int key_len            = 0;
111   size_t       max_reprobe_offset = 0;
112   size_t       size               = 0;
113   unsigned int out_counter_len    = std::numeric_limits<unsigned int>::max();
114   std::string  format;
115   matrix_ptr   matrix;
116 
117   cpp_array<file_info> files(input_files.size());
118 
119   // create an iterator for each hash file
120   for(size_t i = 0; i < files.size(); i++) {
121     files.init(i, input_files[i]);
122     if(!files[i].is.good())
123       throw MergeError(err::msg() << "Failed to open input file '" << input_files[i] << "'");
124 
125     file_header& h = files[i].header;
126     if(i == 0) {
127       key_len            = h.key_len();
128       max_reprobe_offset = h.max_reprobe_offset();
129       size               = h.size();
130       matrix.reset(new RectangularBinaryMatrix(h.matrix()));
131       out_header.size(size);
132       out_header.key_len(key_len);
133       format = h.format();
134       out_header.matrix(*matrix);
135       out_header.max_reprobe(h.max_reprobe());
136       size_t reprobes[h.max_reprobe() + 1];
137       h.get_reprobes(reprobes);
138       out_header.set_reprobes(reprobes);
139       out_counter_len = std::min(out_counter_len, h.counter_len());
140     } else {
141       if(format != h.format())
142         throw MergeError(err::msg() << "Can't merge files with different formats (" << format << ", " << h.format() << ")");
143       if(h.key_len() != key_len)
144         throw MergeError(err::msg() << "Can't merge hashes of different key lengths (" << key_len << ", " << h.key_len() << ")");
145       if(h.max_reprobe_offset() != max_reprobe_offset)
146         throw MergeError("Can't merge hashes with different reprobing strategies");
147       if(h.size() != size)
148         throw MergeError(err::msg() << "Can't merge hash with different size (" << size << ", " << h.size() << ")");
149       if(h.matrix() != *matrix)
150         throw MergeError("Can't merge hash with different hash function");
151     }
152   }
153   mer_dna::k(key_len / 2);
154 
155   std::ofstream out;
156   out.open(out_file);
157   if(!out.good())
158     throw MergeError(err::msg() << "Can't open out file '" << out_file << "'");
159   if(op != JACCARD)
160     out_header.format(format);
161 
162   if(!format.compare(binary_dumper::format)) {
163     out_header.counter_len(out_counter_len);
164     if(op != JACCARD)
165       out_header.write(out);
166     binary_writer writer(out_counter_len, key_len);
167     do_merge<binary_reader, binary_writer>(files, out, writer, min, max, op);
168   } else if(!format.compare(text_dumper::format)) {
169     if(op != JACCARD)
170       out_header.write(out);
171     text_writer writer;
172     do_merge<text_reader, text_writer>(files, out, writer, min, max, op);
173   } else {
174     throw MergeError(err::msg() << "Unknown format '" << format << "'");
175   }
176 }
177