1 /* This file is part of Jellyfish.
2
3 Jellyfish is free software: you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation, either version 3 of the License, or
6 (at your option) any later version.
7
8 Jellyfish is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with Jellyfish. If not, see <http://www.gnu.org/licenses/>.
15 */
16
17 #include <cstdlib>
18 #include <unistd.h>
19 #include <assert.h>
20 #include <signal.h>
21
22 #include <iostream>
23 #include <fstream>
24 #include <string>
25 #include <vector>
26 #include <map>
27 #include <sstream>
28 #include <memory>
29 #include <chrono>
30
31 #include <jellyfish/err.hpp>
32 #include <jellyfish/thread_exec.hpp>
33 #include <jellyfish/hash_counter.hpp>
34 #include <jellyfish/locks_pthread.hpp>
35 #include <jellyfish/stream_manager.hpp>
36 #include <jellyfish/mer_overlap_sequence_parser.hpp>
37 #include <jellyfish/whole_sequence_parser.hpp>
38 #include <jellyfish/mer_iterator.hpp>
39 #include <jellyfish/mer_qual_iterator.hpp>
40 #include <jellyfish/jellyfish.hpp>
41 #include <jellyfish/merge_files.hpp>
42 #include <jellyfish/mer_dna_bloom_counter.hpp>
43 #include <jellyfish/generator_manager.hpp>
44 #include <sub_commands/count_main_cmdline.hpp>
45
46 static count_main_cmdline args; // Command line switches and arguments
47
48 namespace err = jellyfish::err;
49
50 using std::chrono::system_clock;
51 using std::chrono::duration;
52 using std::chrono::duration_cast;
53
54 template<typename DtnType>
as_seconds(DtnType dtn)55 inline double as_seconds(DtnType dtn) { return duration_cast<duration<double>>(dtn).count(); }
56
57 using jellyfish::mer_dna;
58 using jellyfish::mer_dna_bloom_counter;
59 using jellyfish::mer_dna_bloom_filter;
60 typedef std::vector<const char*> file_vector;
61
62 // Types for parsing arbitrary sequence ignoring quality scores
63 typedef jellyfish::mer_overlap_sequence_parser<jellyfish::stream_manager<file_vector::const_iterator> > sequence_parser;
64 typedef jellyfish::mer_iterator<sequence_parser, mer_dna> mer_iterator;
65
66 // Types for parsing reads with quality score. Interface match type
67 // above.
68 class sequence_qual_parser :
69 public jellyfish::whole_sequence_parser<jellyfish::stream_manager<file_vector::const_iterator> >
70 {
71 typedef jellyfish::stream_manager<file_vector::const_iterator> StreamIterator;
72 typedef jellyfish::whole_sequence_parser<StreamIterator> super;
73 public:
sequence_qual_parser(uint16_t mer_len,uint32_t max_producers,uint32_t size,size_t buf_size,StreamIterator & streams)74 sequence_qual_parser(uint16_t mer_len, uint32_t max_producers, uint32_t size, size_t buf_size,
75 StreamIterator& streams) :
76 super(size, 100, max_producers, streams)
77 { }
78 };
79
80 class mer_qual_iterator : public jellyfish::mer_qual_iterator<sequence_qual_parser, mer_dna> {
81 typedef jellyfish::mer_qual_iterator<sequence_qual_parser, mer_dna> super;
82 public:
mer_qual_iterator(sequence_qual_parser & parser,bool canonical=false)83 mer_qual_iterator(sequence_qual_parser& parser, bool canonical = false) :
84 super(parser, args.min_qual_char_arg[0], canonical)
85 { }
86 };
87
88 // k-mer filters. Organized in a linked list, interpreted as a &&
89 // (logical and). I.e. all filter must return true for the result to
90 // be true. By default, filter returns true.
91 struct filter {
92 filter* prev_;
filterfilter93 filter(filter* prev = 0) : prev_(prev) { }
~filterfilter94 virtual ~filter() { }
operator ()filter95 virtual bool operator()(const mer_dna& x) { return and_res(true, x); }
and_resfilter96 bool and_res(bool r, const mer_dna& x) const {
97 return r ? (prev_ ? (*prev_)(x) : true) : false;
98 }
99 };
100
101 struct filter_bc : public filter {
102 const mer_dna_bloom_counter& counter_;
filter_bcfilter_bc103 filter_bc(const mer_dna_bloom_counter& counter, filter* prev = 0) :
104 filter(prev),
105 counter_(counter)
106 { }
operator ()filter_bc107 bool operator()(const mer_dna& m) {
108 unsigned int c = counter_.check(m);
109 return and_res(c > 1, m);
110 }
111 };
112
113 struct filter_bf : public filter {
114 mer_dna_bloom_filter& bf_;
filter_bffilter_bf115 filter_bf(mer_dna_bloom_filter& bf, filter* prev = 0) :
116 filter(prev),
117 bf_(bf)
118 { }
operator ()filter_bf119 bool operator()(const mer_dna& m) {
120 unsigned int c = bf_.insert(m);
121 return and_res(c > 0, m);
122 }
123 };
124
125 enum OPERATION { COUNT, PRIME, UPDATE };
126 template<typename PathIterator, typename MerIteratorType, typename ParserType>
127 class mer_counter_base : public jellyfish::thread_exec {
128 int nb_threads_;
129 mer_hash& ary_;
130 jellyfish::stream_manager<PathIterator> streams_;
131 ParserType parser_;
132 filter* filter_;
133 OPERATION op_;
134
135 public:
mer_counter_base(int nb_threads,mer_hash & ary,PathIterator file_begin,PathIterator file_end,PathIterator pipe_begin,PathIterator pipe_end,uint32_t concurent_files,OPERATION op,filter * filter=new struct filter)136 mer_counter_base(int nb_threads, mer_hash& ary,
137 PathIterator file_begin, PathIterator file_end,
138 PathIterator pipe_begin, PathIterator pipe_end,
139 uint32_t concurent_files,
140 OPERATION op, filter* filter = new struct filter) :
141 ary_(ary),
142 streams_(file_begin, file_end, pipe_begin, pipe_end, concurent_files),
143 parser_(mer_dna::k(), streams_.nb_streams(), 3 * nb_threads, 4096, streams_),
144 filter_(filter),
145 op_(op)
146 { }
147
start(int thid)148 virtual void start(int thid) {
149 size_t count = 0;
150 MerIteratorType mers(parser_, args.canonical_flag);
151
152 switch(op_) {
153 case COUNT:
154 for( ; mers; ++mers) {
155 if((*filter_)(*mers))
156 ary_.add(*mers, 1);
157 ++count;
158 }
159 break;
160
161 case PRIME:
162 for( ; mers; ++mers) {
163 if((*filter_)(*mers))
164 ary_.set(*mers);
165 ++count;
166 }
167 break;
168
169 case UPDATE:
170 mer_dna tmp;
171 for( ; mers; ++mers) {
172 if((*filter_)(*mers))
173 ary_.update_add(*mers, 1, tmp);
174 ++count;
175 }
176 break;
177 }
178
179 ary_.done();
180 }
181 };
182
183 // Counter with and without quality value
184 typedef mer_counter_base<file_vector::const_iterator, mer_iterator, sequence_parser> mer_counter;
185 typedef mer_counter_base<file_vector::const_iterator, mer_qual_iterator, sequence_qual_parser> mer_qual_counter;
186
load_bloom_filter(const char * path)187 mer_dna_bloom_counter* load_bloom_filter(const char* path) {
188 std::ifstream in(path, std::ios::in|std::ios::binary);
189 jellyfish::file_header header(in);
190 if(!in.good())
191 err::die(err::msg() << "Failed to parse bloom filter file '" << path << "'");
192 if(header.format() != "bloomcounter")
193 err::die(err::msg() << "Invalid format '" << header.format() << "'. Expected 'bloomcounter'");
194 if(header.key_len() != mer_dna::k() * 2)
195 err::die("Invalid mer length in bloom filter");
196 jellyfish::hash_pair<mer_dna> fns(header.matrix(1), header.matrix(2));
197 auto res = new mer_dna_bloom_counter(header.size(), header.nb_hashes(), in, fns);
198 if(!in.good())
199 err::die("Bloom filter file is truncated");
200 in.close();
201 return res;
202 }
203
204 // If get a termination signal, kill the manager and then kill myself.
205 static pid_t manager_pid = 0;
signal_handler(int sig)206 static void signal_handler(int sig) {
207 if(manager_pid)
208 kill(manager_pid, SIGTERM);
209 signal(sig, SIG_DFL);
210 kill(getpid(), sig);
211 _exit(EXIT_FAILURE); // Should not be reached
212 }
213
count_main(int argc,char * argv[])214 int count_main(int argc, char *argv[])
215 {
216 auto start_time = system_clock::now();
217
218 jellyfish::file_header header;
219 header.fill_standard();
220 header.set_cmdline(argc, argv);
221
222 args.parse(argc, argv);
223
224 if(args.min_qual_char_given && args.min_qual_char_arg.size() != 1)
225 count_main_cmdline::error("[-Q, --min-qual-char] must be one character.");
226
227 mer_dna::k(args.mer_len_arg);
228
229 std::unique_ptr<jellyfish::generator_manager> generator_manager;
230 if(args.generator_given) {
231 auto gm =
232 new jellyfish::generator_manager(args.generator_arg, args.Generators_arg,
233 args.shell_given ? args.shell_arg : (const char*)0);
234 generator_manager.reset(gm);
235 generator_manager->start();
236 manager_pid = generator_manager->pid();
237 struct sigaction act;
238 memset(&act, '\0', sizeof(act));
239 act.sa_handler = signal_handler;
240 assert(sigaction(SIGTERM, &act, 0) == 0);
241 }
242
243 header.canonical(args.canonical_flag);
244 mer_hash ary(args.size_arg, args.mer_len_arg * 2, args.counter_len_arg, args.threads_arg, args.reprobes_arg);
245 if(args.disk_flag)
246 ary.do_size_doubling(false);
247
248 std::unique_ptr<jellyfish::dumper_t<mer_array> > dumper;
249 if(args.text_flag)
250 dumper.reset(new text_dumper(args.threads_arg, args.output_arg, &header));
251 else
252 dumper.reset(new binary_dumper(args.out_counter_len_arg, ary.key_len(), args.threads_arg, args.output_arg, &header));
253 ary.dumper(dumper.get());
254
255 auto after_init_time = system_clock::now();
256
257 OPERATION do_op = COUNT;
258 if(args.if_given) {
259 mer_counter counter(args.threads_arg, ary,
260 args.if_arg.begin(), args.if_arg.end(),
261 args.if_arg.end(), args.if_arg.end(), // no multi pipes
262 args.Files_arg, PRIME);
263 counter.exec_join(args.threads_arg);
264 do_op = UPDATE;
265 }
266
267 // Iterators to the multi pipe paths. If no generator manager,
268 // generate an empty range.
269 auto pipes_begin = generator_manager.get() ? generator_manager->pipes().begin() : args.file_arg.end();
270 auto pipes_end = (bool)generator_manager ? generator_manager->pipes().end() : args.file_arg.end();
271
272 // Bloom counter read from file to filter out low frequency
273 // k-mers. Two pass algorithm.
274 std::unique_ptr<filter> mer_filter(new filter);
275 std::unique_ptr<mer_dna_bloom_counter> bc;
276 if(args.bc_given) {
277 bc.reset(load_bloom_filter(args.bc_arg));
278 mer_filter.reset(new filter_bc(*bc));
279 }
280
281 // Bloom filter to filter out low frequency k-mers. One pass
282 // algorithm.
283 std::unique_ptr<mer_dna_bloom_filter> bf;
284 if(args.bf_size_given) {
285 bf.reset(new mer_dna_bloom_filter(args.bf_fp_arg, args.bf_size_arg));
286 mer_filter.reset(new filter_bf(*bf));
287 }
288
289 if(args.min_qual_char_given) {
290 mer_qual_counter counter(args.threads_arg, ary,
291 args.file_arg.begin(), args.file_arg.end(),
292 pipes_begin, pipes_end,
293 args.Files_arg,
294 do_op, mer_filter.get());
295 counter.exec_join(args.threads_arg);
296 } else {
297 mer_counter counter(args.threads_arg, ary,
298 args.file_arg.begin(), args.file_arg.end(),
299 pipes_begin, pipes_end,
300 args.Files_arg,
301 do_op, mer_filter.get());
302 counter.exec_join(args.threads_arg);
303 }
304
305 // If we have a manager, wait for it
306 if(generator_manager) {
307 signal(SIGTERM, SIG_DFL);
308 manager_pid = 0;
309 if(!generator_manager->wait())
310 err::die("Some generator commands failed");
311 generator_manager.reset();
312 }
313
314 auto after_count_time = system_clock::now();
315
316 // If no intermediate files, dump directly into output file. If not, will do a round of merging
317 if(!args.no_write_flag) {
318 if(dumper->nb_files() == 0) {
319 dumper->one_file(true);
320 if(args.lower_count_given)
321 dumper->min(args.lower_count_arg);
322 if(args.upper_count_given)
323 dumper->max(args.upper_count_arg);
324 dumper->dump(ary.ary());
325 } else {
326 dumper->dump(ary.ary());
327 if(!args.no_merge_flag) {
328 std::vector<const char*> files = dumper->file_names_cstr();
329 uint64_t min = args.lower_count_given ? args.lower_count_arg : 0;
330 uint64_t max = args.upper_count_given ? args.upper_count_arg : std::numeric_limits<uint64_t>::max();
331 try {
332 merge_files(files, args.output_arg, header, min, max);
333 } catch(MergeError e) {
334 err::die(err::msg() << e.what());
335 }
336 if(!args.no_unlink_flag) {
337 for(int i =0; i < dumper->nb_files(); ++i)
338 unlink(files[i]);
339 }
340 } // if(!args.no_merge_flag
341 } // if(!args.no_merge_flag
342 }
343
344 auto after_dump_time = system_clock::now();
345
346 if(args.timing_given) {
347 std::ofstream timing_file(args.timing_arg);
348 timing_file << "Init " << as_seconds(after_init_time - start_time) << "\n"
349 << "Counting " << as_seconds(after_count_time - after_init_time) << "\n"
350 << "Writing " << as_seconds(after_dump_time - after_count_time) << "\n";
351 }
352
353 return 0;
354 }
355