1 /*  This file is part of Jellyfish.
2 
3     Jellyfish is free software: you can redistribute it and/or modify
4     it under the terms of the GNU General Public License as published by
5     the Free Software Foundation, either version 3 of the License, or
6     (at your option) any later version.
7 
8     Jellyfish is distributed in the hope that it will be useful,
9     but WITHOUT ANY WARRANTY; without even the implied warranty of
10     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11     GNU General Public License for more details.
12 
13     You should have received a copy of the GNU General Public License
14     along with Jellyfish.  If not, see <http://www.gnu.org/licenses/>.
15 */
16 
17 #include <cstdlib>
18 #include <unistd.h>
19 #include <assert.h>
20 #include <signal.h>
21 
22 #include <iostream>
23 #include <fstream>
24 #include <string>
25 #include <vector>
26 #include <map>
27 #include <sstream>
28 #include <memory>
29 #include <chrono>
30 
31 #include <jellyfish/err.hpp>
32 #include <jellyfish/thread_exec.hpp>
33 #include <jellyfish/hash_counter.hpp>
34 #include <jellyfish/locks_pthread.hpp>
35 #include <jellyfish/stream_manager.hpp>
36 #include <jellyfish/mer_overlap_sequence_parser.hpp>
37 #include <jellyfish/whole_sequence_parser.hpp>
38 #include <jellyfish/mer_iterator.hpp>
39 #include <jellyfish/mer_qual_iterator.hpp>
40 #include <jellyfish/jellyfish.hpp>
41 #include <jellyfish/merge_files.hpp>
42 #include <jellyfish/mer_dna_bloom_counter.hpp>
43 #include <jellyfish/generator_manager.hpp>
44 #include <sub_commands/count_main_cmdline.hpp>
45 
46 static count_main_cmdline args; // Command line switches and arguments
47 
48 namespace err = jellyfish::err;
49 
50 using std::chrono::system_clock;
51 using std::chrono::duration;
52 using std::chrono::duration_cast;
53 
54 template<typename DtnType>
as_seconds(DtnType dtn)55 inline double as_seconds(DtnType dtn) { return duration_cast<duration<double>>(dtn).count(); }
56 
57 using jellyfish::mer_dna;
58 using jellyfish::mer_dna_bloom_counter;
59 using jellyfish::mer_dna_bloom_filter;
60 typedef std::vector<const char*> file_vector;
61 
62 // Types for parsing arbitrary sequence ignoring quality scores
63 typedef jellyfish::mer_overlap_sequence_parser<jellyfish::stream_manager<file_vector::const_iterator> > sequence_parser;
64 typedef jellyfish::mer_iterator<sequence_parser, mer_dna> mer_iterator;
65 
66 // Types for parsing reads with quality score. Interface match type
67 // above.
68 class sequence_qual_parser :
69   public jellyfish::whole_sequence_parser<jellyfish::stream_manager<file_vector::const_iterator> >
70 {
71   typedef jellyfish::stream_manager<file_vector::const_iterator> StreamIterator;
72   typedef jellyfish::whole_sequence_parser<StreamIterator> super;
73 public:
sequence_qual_parser(uint16_t mer_len,uint32_t max_producers,uint32_t size,size_t buf_size,StreamIterator & streams)74   sequence_qual_parser(uint16_t mer_len, uint32_t max_producers, uint32_t size, size_t buf_size,
75                        StreamIterator& streams) :
76     super(size, 100, max_producers, streams)
77   { }
78 };
79 
80 class mer_qual_iterator : public jellyfish::mer_qual_iterator<sequence_qual_parser, mer_dna> {
81   typedef jellyfish::mer_qual_iterator<sequence_qual_parser, mer_dna> super;
82 public:
mer_qual_iterator(sequence_qual_parser & parser,bool canonical=false)83   mer_qual_iterator(sequence_qual_parser& parser, bool canonical = false) :
84     super(parser, args.min_qual_char_arg[0], canonical)
85   { }
86 };
87 
88 // k-mer filters. Organized in a linked list, interpreted as a &&
89 // (logical and). I.e. all filter must return true for the result to
90 // be true. By default, filter returns true.
91 struct filter {
92   filter* prev_;
filterfilter93   filter(filter* prev = 0) : prev_(prev) { }
~filterfilter94   virtual ~filter() { }
operator ()filter95   virtual bool operator()(const mer_dna& x) { return and_res(true, x); }
and_resfilter96   bool and_res(bool r, const mer_dna& x) const {
97     return r ? (prev_ ? (*prev_)(x) : true) : false;
98   }
99 };
100 
101 struct filter_bc : public filter {
102   const mer_dna_bloom_counter& counter_;
filter_bcfilter_bc103   filter_bc(const mer_dna_bloom_counter& counter, filter* prev = 0) :
104     filter(prev),
105     counter_(counter)
106   { }
operator ()filter_bc107   bool operator()(const mer_dna& m) {
108     unsigned int c = counter_.check(m);
109     return and_res(c > 1, m);
110   }
111 };
112 
113 struct filter_bf : public filter {
114   mer_dna_bloom_filter& bf_;
filter_bffilter_bf115   filter_bf(mer_dna_bloom_filter& bf, filter* prev = 0) :
116     filter(prev),
117     bf_(bf)
118   { }
operator ()filter_bf119   bool operator()(const mer_dna& m) {
120     unsigned int c = bf_.insert(m);
121     return and_res(c > 0, m);
122   }
123 };
124 
125 enum OPERATION { COUNT, PRIME, UPDATE };
126 template<typename PathIterator, typename MerIteratorType, typename ParserType>
127 class mer_counter_base : public jellyfish::thread_exec {
128   int                                     nb_threads_;
129   mer_hash&                               ary_;
130   jellyfish::stream_manager<PathIterator> streams_;
131   ParserType                              parser_;
132   filter*                                 filter_;
133   OPERATION                               op_;
134 
135 public:
mer_counter_base(int nb_threads,mer_hash & ary,PathIterator file_begin,PathIterator file_end,PathIterator pipe_begin,PathIterator pipe_end,uint32_t concurent_files,OPERATION op,filter * filter=new struct filter)136   mer_counter_base(int nb_threads, mer_hash& ary,
137                    PathIterator file_begin, PathIterator file_end,
138                    PathIterator pipe_begin, PathIterator pipe_end,
139                    uint32_t concurent_files,
140                    OPERATION op, filter* filter = new struct filter) :
141     ary_(ary),
142     streams_(file_begin, file_end, pipe_begin, pipe_end, concurent_files),
143     parser_(mer_dna::k(), streams_.nb_streams(), 3 * nb_threads, 4096, streams_),
144     filter_(filter),
145     op_(op)
146   { }
147 
start(int thid)148   virtual void start(int thid) {
149     size_t count = 0;
150     MerIteratorType mers(parser_, args.canonical_flag);
151 
152     switch(op_) {
153      case COUNT:
154       for( ; mers; ++mers) {
155         if((*filter_)(*mers))
156           ary_.add(*mers, 1);
157         ++count;
158       }
159       break;
160 
161     case PRIME:
162       for( ; mers; ++mers) {
163         if((*filter_)(*mers))
164           ary_.set(*mers);
165         ++count;
166       }
167       break;
168 
169     case UPDATE:
170       mer_dna tmp;
171       for( ; mers; ++mers) {
172         if((*filter_)(*mers))
173           ary_.update_add(*mers, 1, tmp);
174         ++count;
175       }
176       break;
177     }
178 
179     ary_.done();
180   }
181 };
182 
183 // Counter with and without quality value
184 typedef mer_counter_base<file_vector::const_iterator, mer_iterator, sequence_parser> mer_counter;
185 typedef mer_counter_base<file_vector::const_iterator, mer_qual_iterator, sequence_qual_parser> mer_qual_counter;
186 
load_bloom_filter(const char * path)187 mer_dna_bloom_counter* load_bloom_filter(const char* path) {
188   std::ifstream in(path, std::ios::in|std::ios::binary);
189   jellyfish::file_header header(in);
190   if(!in.good())
191     err::die(err::msg() << "Failed to parse bloom filter file '" << path << "'");
192   if(header.format() != "bloomcounter")
193     err::die(err::msg() << "Invalid format '" << header.format() << "'. Expected 'bloomcounter'");
194   if(header.key_len() != mer_dna::k() * 2)
195     err::die("Invalid mer length in bloom filter");
196   jellyfish::hash_pair<mer_dna> fns(header.matrix(1), header.matrix(2));
197   auto res = new mer_dna_bloom_counter(header.size(), header.nb_hashes(), in, fns);
198   if(!in.good())
199     err::die("Bloom filter file is truncated");
200   in.close();
201   return res;
202 }
203 
204 // If get a termination signal, kill the manager and then kill myself.
205 static pid_t manager_pid = 0;
signal_handler(int sig)206 static void signal_handler(int sig) {
207   if(manager_pid)
208     kill(manager_pid, SIGTERM);
209   signal(sig, SIG_DFL);
210   kill(getpid(), sig);
211   _exit(EXIT_FAILURE); // Should not be reached
212 }
213 
count_main(int argc,char * argv[])214 int count_main(int argc, char *argv[])
215 {
216   auto start_time = system_clock::now();
217 
218   jellyfish::file_header header;
219   header.fill_standard();
220   header.set_cmdline(argc, argv);
221 
222   args.parse(argc, argv);
223 
224   if(args.min_qual_char_given && args.min_qual_char_arg.size() != 1)
225     count_main_cmdline::error("[-Q, --min-qual-char] must be one character.");
226 
227   mer_dna::k(args.mer_len_arg);
228 
229   std::unique_ptr<jellyfish::generator_manager> generator_manager;
230   if(args.generator_given) {
231     auto gm =
232       new jellyfish::generator_manager(args.generator_arg, args.Generators_arg,
233                                        args.shell_given ? args.shell_arg : (const char*)0);
234     generator_manager.reset(gm);
235     generator_manager->start();
236     manager_pid = generator_manager->pid();
237     struct sigaction act;
238     memset(&act, '\0', sizeof(act));
239     act.sa_handler = signal_handler;
240     assert(sigaction(SIGTERM, &act, 0) == 0);
241   }
242 
243   header.canonical(args.canonical_flag);
244   mer_hash ary(args.size_arg, args.mer_len_arg * 2, args.counter_len_arg, args.threads_arg, args.reprobes_arg);
245   if(args.disk_flag)
246     ary.do_size_doubling(false);
247 
248   std::unique_ptr<jellyfish::dumper_t<mer_array> > dumper;
249   if(args.text_flag)
250     dumper.reset(new text_dumper(args.threads_arg, args.output_arg, &header));
251   else
252     dumper.reset(new binary_dumper(args.out_counter_len_arg, ary.key_len(), args.threads_arg, args.output_arg, &header));
253   ary.dumper(dumper.get());
254 
255   auto after_init_time = system_clock::now();
256 
257   OPERATION do_op = COUNT;
258   if(args.if_given) {
259     mer_counter counter(args.threads_arg, ary,
260                         args.if_arg.begin(), args.if_arg.end(),
261                         args.if_arg.end(), args.if_arg.end(), // no multi pipes
262                         args.Files_arg, PRIME);
263     counter.exec_join(args.threads_arg);
264     do_op = UPDATE;
265   }
266 
267   // Iterators to the multi pipe paths. If no generator manager,
268   // generate an empty range.
269   auto pipes_begin = generator_manager.get() ? generator_manager->pipes().begin() : args.file_arg.end();
270   auto pipes_end = (bool)generator_manager ? generator_manager->pipes().end() : args.file_arg.end();
271 
272   // Bloom counter read from file to filter out low frequency
273   // k-mers. Two pass algorithm.
274   std::unique_ptr<filter> mer_filter(new filter);
275   std::unique_ptr<mer_dna_bloom_counter> bc;
276   if(args.bc_given) {
277     bc.reset(load_bloom_filter(args.bc_arg));
278     mer_filter.reset(new filter_bc(*bc));
279   }
280 
281   // Bloom filter to filter out low frequency k-mers. One pass
282   // algorithm.
283   std::unique_ptr<mer_dna_bloom_filter> bf;
284   if(args.bf_size_given) {
285     bf.reset(new mer_dna_bloom_filter(args.bf_fp_arg, args.bf_size_arg));
286     mer_filter.reset(new filter_bf(*bf));
287   }
288 
289   if(args.min_qual_char_given) {
290     mer_qual_counter counter(args.threads_arg, ary,
291                              args.file_arg.begin(), args.file_arg.end(),
292                              pipes_begin, pipes_end,
293                              args.Files_arg,
294                              do_op, mer_filter.get());
295     counter.exec_join(args.threads_arg);
296   } else {
297     mer_counter counter(args.threads_arg, ary,
298                         args.file_arg.begin(), args.file_arg.end(),
299                         pipes_begin, pipes_end,
300                         args.Files_arg,
301                         do_op, mer_filter.get());
302     counter.exec_join(args.threads_arg);
303   }
304 
305   // If we have a manager, wait for it
306   if(generator_manager) {
307     signal(SIGTERM, SIG_DFL);
308     manager_pid = 0;
309     if(!generator_manager->wait())
310       err::die("Some generator commands failed");
311     generator_manager.reset();
312   }
313 
314   auto after_count_time = system_clock::now();
315 
316   // If no intermediate files, dump directly into output file. If not, will do a round of merging
317   if(!args.no_write_flag) {
318     if(dumper->nb_files() == 0) {
319       dumper->one_file(true);
320       if(args.lower_count_given)
321         dumper->min(args.lower_count_arg);
322       if(args.upper_count_given)
323         dumper->max(args.upper_count_arg);
324       dumper->dump(ary.ary());
325     } else {
326       dumper->dump(ary.ary());
327       if(!args.no_merge_flag) {
328         std::vector<const char*> files = dumper->file_names_cstr();
329         uint64_t min = args.lower_count_given ? args.lower_count_arg : 0;
330         uint64_t max = args.upper_count_given ? args.upper_count_arg : std::numeric_limits<uint64_t>::max();
331         try {
332           merge_files(files, args.output_arg, header, min, max);
333         } catch(MergeError e) {
334           err::die(err::msg() << e.what());
335         }
336         if(!args.no_unlink_flag) {
337           for(int i =0; i < dumper->nb_files(); ++i)
338             unlink(files[i]);
339         }
340       } // if(!args.no_merge_flag
341     } // if(!args.no_merge_flag
342   }
343 
344   auto after_dump_time = system_clock::now();
345 
346   if(args.timing_given) {
347     std::ofstream timing_file(args.timing_arg);
348     timing_file << "Init     " << as_seconds(after_init_time - start_time) << "\n"
349                 << "Counting " << as_seconds(after_count_time - after_init_time) << "\n"
350                 << "Writing  " << as_seconds(after_dump_time - after_count_time) << "\n";
351   }
352 
353   return 0;
354 }
355