1 /*  This file is part of Jellyfish.
2 
3     Jellyfish is free software: you can redistribute it and/or modify
4     it under the terms of the GNU General Public License as published by
5     the Free Software Foundation, either version 3 of the License, or
6     (at your option) any later version.
7 
8     Jellyfish is distributed in the hope that it will be useful,
9     but WITHOUT ANY WARRANTY; without even the implied warranty of
10     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11     GNU General Public License for more details.
12 
13     You should have received a copy of the GNU General Public License
14     along with Jellyfish.  If not, see <http://www.gnu.org/licenses/>.
15 */
16 
17 #include <signal.h>
18 
19 #include <iostream>
20 #include <fstream>
21 #include <vector>
22 #include <chrono>
23 
24 #include <jellyfish/err.hpp>
25 #include <jellyfish/mer_overlap_sequence_parser.hpp>
26 #include <jellyfish/mer_iterator.hpp>
27 #include <jellyfish/stream_manager.hpp>
28 #include <jellyfish/generator_manager.hpp>
29 #include <jellyfish/mer_dna_bloom_counter.hpp>
30 #include <jellyfish/thread_exec.hpp>
31 #include <jellyfish/file_header.hpp>
32 #include <sub_commands/bc_main_cmdline.hpp>
33 
34 namespace err = jellyfish::err;
35 
36 using std::chrono::system_clock;
37 using std::chrono::duration;
38 using std::chrono::duration_cast;
39 
40 template<typename DtnType>
as_seconds(DtnType dtn)41 inline double as_seconds(DtnType dtn) { return duration_cast<duration<double>>(dtn).count(); }
42 
43 static bc_main_cmdline args; // Command line switches and arguments
44 typedef std::vector<const char*> file_vector;
45 using jellyfish::mer_dna;
46 using jellyfish::mer_dna_bloom_counter;
47 typedef jellyfish::mer_overlap_sequence_parser<jellyfish::stream_manager<file_vector::const_iterator> > sequence_parser;
48 typedef jellyfish::mer_iterator<sequence_parser, jellyfish::mer_dna> mer_iterator;
49 
50 template<typename PathIterator>
51 class mer_bloom_counter : public jellyfish::thread_exec {
52   int                                     nb_threads_;
53   mer_dna_bloom_counter&                  filter_;
54   jellyfish::stream_manager<PathIterator> streams_;
55   sequence_parser                         parser_;
56 
57 public:
mer_bloom_counter(int nb_threads,mer_dna_bloom_counter & filter,PathIterator file_begin,PathIterator file_end,PathIterator pipe_begin,PathIterator pipe_end,uint32_t concurent_files)58   mer_bloom_counter(int nb_threads, mer_dna_bloom_counter& filter,
59                     PathIterator file_begin, PathIterator file_end,
60                     PathIterator pipe_begin, PathIterator pipe_end,
61                     uint32_t concurent_files) :
62     filter_(filter),
63     streams_(file_begin, file_end, pipe_begin, pipe_end, concurent_files),
64     parser_(jellyfish::mer_dna::k(), streams_.nb_streams(), 3 * nb_threads, 4096, streams_)
65   { }
66 
start(int thid)67   virtual void start(int thid) {
68     for(mer_iterator mers(parser_, args.canonical_flag) ; mers; ++mers) {
69       filter_.insert(*mers);
70     }
71   }
72 };
73 
74 // If get a termination signal, kill the manager and then kill myself.
75 static pid_t manager_pid = 0;
signal_handler(int sig)76 static void signal_handler(int sig) {
77   if(manager_pid)
78     kill(manager_pid, SIGTERM);
79   signal(sig, SIG_DFL);
80   kill(getpid(), sig);
81   _exit(EXIT_FAILURE); // Should not be reached
82 }
83 
bc_main(int argc,char * argv[])84 int bc_main(int argc, char *argv[])
85 {
86   auto start_time = system_clock::now();
87 
88   jellyfish::file_header header;
89   header.fill_standard();
90   header.set_cmdline(argc, argv);
91 
92   args.parse(argc, argv);
93   mer_dna::k(args.mer_len_arg);
94 
95   std::unique_ptr<jellyfish::generator_manager> generator_manager;
96   if(args.generator_given) {
97     auto gm =
98       new jellyfish::generator_manager(args.generator_arg, args.Generators_arg,
99                                        args.shell_given ? args.shell_arg : (const char*)0);
100     generator_manager.reset(gm);
101     generator_manager->start();
102     manager_pid = generator_manager->pid();
103     struct sigaction act;
104     memset(&act, '\0', sizeof(act));
105     act.sa_handler = signal_handler;
106     assert(sigaction(SIGTERM, &act, 0) == 0);
107   }
108 
109   header.canonical(args.canonical_flag);
110   std::ofstream output(args.output_arg);
111   if(!output.good())
112     err::die(err::msg() << "Can't open output file '" << args.output_arg << "'");
113 
114   header.format("bloomcounter");
115   header.key_len(args.mer_len_arg * 2);
116   jellyfish::hash_pair<mer_dna> hash_fns;
117   header.matrix(hash_fns.m1, 1);
118   header.matrix(hash_fns.m2, 2);
119 
120   mer_dna_bloom_counter filter(args.fpr_arg, args.size_arg, hash_fns);
121   header.size(filter.m());
122   header.nb_hashes(filter.k());
123   header.write(output);
124 
125   auto after_init_time = system_clock::now();
126 
127   // Iterators to the multi pipe paths. If no generator manager,
128   // generate an empty range.
129   auto pipes_begin = generator_manager.get() ? generator_manager->pipes().begin() : args.file_arg.end();
130   auto pipes_end = (bool)generator_manager ? generator_manager->pipes().end() : args.file_arg.end();
131 
132   mer_bloom_counter<file_vector::const_iterator> counter(args.threads_arg, filter,
133                                                          args.file_arg.begin(), args.file_arg.end(),
134                                                          pipes_begin, pipes_end, args.Files_arg);
135   counter.exec_join(args.threads_arg);
136 
137   // If we have a manager, wait for it
138   if(generator_manager) {
139     signal(SIGTERM, SIG_DFL);
140     manager_pid = 0;
141     if(!generator_manager->wait())
142       err::die("Some generator commands failed");
143     generator_manager.reset();
144   }
145 
146   auto after_count_time = system_clock::now();
147 
148   filter.write_bits(output);
149   output.close();
150 
151   auto after_dump_time = system_clock::now();
152 
153   if(args.timing_given) {
154     std::ofstream timing_file(args.timing_arg);
155     timing_file << "Init     " << as_seconds(after_init_time - start_time) << "\n"
156                 << "Counting " << as_seconds(after_count_time - after_init_time) << "\n"
157                 << "Writing  " << as_seconds(after_dump_time - after_count_time) << "\n";
158   }
159 
160   return 0;
161 }
162