1 /* SuperRead pipeline
2  * Copyright (C) 2012  Genome group at University of Maryland.
3  *
4  * This program is free software: you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License as
6  * published by the Free Software Foundation, either version 3 of the
7  * License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 #ifndef _CREATE_K_UNITIGS_COMMON_H_
19 #define _CREATE_K_UNITIGS_COMMON_H_
20 
21 #include <jellyfish/thread_exec.hpp>
22 #include <jellyfish/mer_dna.hpp>
23 #include <jellyfish/mer_iterator.hpp>
24 #include <jellyfish/stream_manager.hpp>
25 #include <jellyfish/mer_overlap_sequence_parser.hpp>
26 #include <jellyfish/mer_dna_bloom_counter.hpp>
27 
28 #include <jflib/multiplexed_io.hpp>
29 #include <jellyfish/atomic_field.hpp>
30 
31 using jellyfish::mer_dna;
32 using jellyfish::thread_exec;
33 using jellyfish::mer_dna_bloom_counter;
34 using jellyfish::mer_dna_bloom_filter;
35 
36 // Wrapper around mer_iterator to satisfy common interface
37 typedef std::vector<const char*> file_vector;
38 typedef jellyfish::stream_manager<file_vector::const_iterator> stream_manager;
39 typedef jellyfish::mer_overlap_sequence_parser<stream_manager> sequence_parser;
40 typedef jellyfish::mer_iterator<sequence_parser, mer_dna> mer_iterator;
41 class read_mers {
42   mer_iterator stream_;
43 
44 public:
read_mers(sequence_parser & parser,int id)45   read_mers(sequence_parser& parser, int id) : stream_(parser, true) { }
operator bool() const46   operator bool() const { return (void*)stream_ != 0; }
operator ->() const47   const mer_dna* operator->() const { return stream_.operator->(); }
operator *() const48   const mer_dna& operator*() const { return stream_.operator*(); }
operator ++()49   read_mers& operator++() {
50     ++stream_;
51     return *this;
52   }
53 };
54 
55 // Wrapper around bloom filter class to have set compatible insert
56 // method.
57 class mer_bloom {
58   mer_dna_bloom_filter bf_;
59 
60 public:
mer_bloom(double fp,size_t size)61   mer_bloom(double fp, size_t size) : bf_(fp, size) { }
insert(const mer_dna & m)62   std::pair<unsigned int, bool> insert(const mer_dna& m) {
63     unsigned int r = bf_.insert(m);
64     return std::make_pair(r, r == 0);
65   }
66 };
67 
68 
69 // Insert a mer in a set and return true if the k-mer is new in the
70 // set.
71 template<typename set_type>
insert_canonical(set_type & set,const mer_dna & mer)72 bool insert_canonical(set_type& set, const mer_dna& mer) {
73   return set.insert(mer.get_canonical()).second;
74 }
75 
76 /* Read k-mers and store them in a map. The map_type must have the
77    operator[]. The content returned must have the prefix ++. All this
78    needs to be multi-thread safe.
79  */
80 template<typename map_type, typename parser_type, typename stream_type>
81 class populate_mer_set : public thread_exec {
82   int          mer_len_;
83   parser_type& parser_;
84   map_type&    set_;
85 
86 public:
populate_mer_set(int mer_len,map_type & set,parser_type & parser)87   populate_mer_set(int mer_len, map_type& set, parser_type& parser) :
88     mer_len_(mer_len), parser_(parser), set_(set)
89   { }
90 
start(int thid)91   void start(int thid) {
92     for(stream_type stream(parser_, thid); stream; ++stream)
93       ++set_[*stream];
94     set_.done();
95   }
96 };
97 
98 /* - mer_counts_type maps k-mer to counts. Has operator[] returning
99      the count.
100 
101    - used_type is a set type with operator insert. (set compatible)
102    - end_points_type is a set type with operator insert. (set compatible)
103    - parser_type is the type to generate iterator of k-mers
104    - stream_type is the k-mer iteratorp type
105    - args_type is the type of switches on command line.
106  */
107 template<typename mer_counts_type, typename used_type, typename end_points_type,
108          typename parser_type, typename stream_type,
109          typename args_type>
110 class create_k_unitig : public jellyfish::thread_exec {
111   const mer_counts_type&        counts_; // Counts for k-mers
112   used_type&                    used_mers_; // Mark all k-mers whether they have been visited already
113   end_points_type&              end_points_; // End points of k-unitigs, to ouput only once
114   int                           threads_;
115   parser_type&                  parser_;
116   jflib::o_multiplexer          output_multiplexer_;
117   jflib::atomic_field<uint64_t> unitig_id_;
118   const args_type&              args_;
119 
120   enum direction { forward = 1, backward = -1 };
rev_direction(direction dir)121   static direction rev_direction(direction dir) { return (direction)-dir; }
122 
123 public:
create_k_unitig(const mer_counts_type & counts,used_type & used,end_points_type & ends,int threads,parser_type & parser,std::ostream & output,const args_type & args)124   create_k_unitig(const mer_counts_type& counts, used_type& used, end_points_type& ends,
125                   int threads, parser_type& parser, std::ostream& output,
126                   const args_type& args) :
127     counts_(counts),
128     used_mers_(used),
129     end_points_(ends),
130     threads_(threads),
131     parser_(parser),
132     output_multiplexer_(&output, 3 * threads, 4096),
133     unitig_id_(0),
134     args_(args)
135   { }
136 
start(int thid)137   virtual void start(int thid) {
138     //    mer_stream<mer_dna, read_parser> stream(mer_dna::k(), parser_);
139     stream_type     stream(parser_, thid);
140     jflib::omstream output(output_multiplexer_);
141     mer_dna         current;
142     mer_dna         continuation;
143     mer_dna         tmp;
144 
145     for( ; stream; ++stream) {
146       auto is_new = used_mers_.insert(*stream);
147       if(!is_new.second)
148         continue;
149       // Never start a unitig on low count
150       if(counts_[*stream] < args_.quality_threshold_arg)
151         continue;
152       current = *stream;
153       //printf("Current mer %s\n",(current.to_str()).c_str());
154       // Grow unitig if a starting (branching) mer
155       if(starting_mer(forward, current)) {
156         //printf("Starting forward\n");
157         grow_unitig(backward, current, output);
158       } else if(starting_mer(backward, current)) {
159         //printf("Starting reverse\n");
160         grow_unitig(forward, current, output);
161       }
162       // Unique continuation on both sides -> middle of k-unitig: do nothing
163     }
164   }
165 
166 private:
167   // Check all k-mers extending in one direction. If unique
168   // continuation, store it in cont and return true. A unique
169   // continuation may have a count less than the min quality
170   // threshold, as will be reported in the count output argument.
171   //
172   // On the other hand, continuation with count less than the min
173   // quality threshold do not create a branch compared to a high
174   // quality continuation. I.e., if the threshold is 2, and the counts
175   // are as follow:
176   //
177   // 0, 1, 0, 0 -> unique continuation, count of 1
178   // 0, 1, 0, 1 -> no unique continuation
179   // 2, 0, 0, 0 -> unique continuation, count of 2
180   // 2, 1, 0, 0 -> unique continuation, count of 2
181   // 2, 0, 3, 0 -> no unique continuation
182   //
183   // Otherwise return false and the value of cont is undetermined. If
184   // true is returned and count is not NULL, the count of the unique
185   // continuation mer is stored in the pointer.
next_mer(const direction dir,const mer_dna & start,mer_dna & cont,unsigned int * count=0)186   bool next_mer(const direction dir, const mer_dna& start, mer_dna& cont,
187                 unsigned int* count = 0) {
188     int     index;
189     mer_dna cont_comp(start);
190     cont_comp.reverse_complement();
191     cont = start;
192 
193     if(dir == forward) {
194       cont.shift_left(0);
195       cont_comp.shift_right(0);
196       index = 0;
197     } else {
198       cont.shift_right(0);
199       cont_comp.shift_left(0);
200       index = cont.k() - 1;
201     }
202     auto base = cont.base(index); // Point to first or last base. Correct base to change
203     auto base_comp = cont_comp.base(cont.k() - 1 - index);
204 
205     int          nb_cont    = 0, nb_low_cont = 0;
206     int          code       = 0, low_code = 0;
207     unsigned int cont_count = 0, low_cont_count = 0;
208     for(int i = 0; i < 4; ++i) {
209       base      = i;
210       base_comp = mer_dna::complement(i);
211 
212       unsigned int cont_count_ = counts_[cont < cont_comp ? cont : cont_comp];
213       if(cont_count_ >= args_.quality_threshold_arg) {
214         if(++nb_cont > 1)
215           return false;
216         code       = i;
217         cont_count = cont_count_;
218       } else if(cont_count_ > 0) {
219         ++nb_low_cont;
220         low_code       = i;
221         low_cont_count = cont_count_;
222       }
223     }
224 
225     if(nb_cont == 1) {
226       base = code;
227       if(count)
228         *count = cont_count;
229       return true;
230     } else if(nb_cont == 0 && nb_low_cont == 1) {
231         base = low_code;
232         if(count)
233           *count = low_cont_count;
234         return true;
235     }
236     return false;
237   }
238 
239   // Return true if m is a starting mer in the given dir: a mer is a
240   // starting mer if it is branching forward, or backward, or dries
241   // out, maybe after some number of low count mer to skip.
starting_mer(direction dir,mer_dna m)242   bool starting_mer(direction dir, mer_dna m) {
243     int     low_cont = args_.cont_on_low_arg;
244     mer_dna tmp1, tmp2;
245 
246     while(true) {
247       unsigned int count = 0;
248       if(!next_mer(dir, m, tmp1, &count))
249         return true;
250       if(count >= args_.quality_threshold_arg) {
251         if(!next_mer(rev_direction(dir), tmp1, tmp2, &count))
252           return true;
253         break;
254       }
255       if(--low_cont < 0)
256         return true;
257       m = tmp1;
258     }
259     return false;
260   }
261 
grow_unitig(const direction dir,const mer_dna & start,jflib::omstream & output)262   void grow_unitig(const direction dir, const mer_dna& start, jflib::omstream& output) {
263     bool start_new = insert_canonical(end_points_, start);
264     if(!start_new)
265       return;
266     //printf("Starting unitig from %s\n",(start.to_str()).c_str());
267     mer_dna            mer1(start);
268     mer_dna            mer2;
269     mer_dna            mer3;
270     mer_dna           *current = &mer1;
271     mer_dna           *cont    = &mer2;
272     unsigned int       count   = 0;
273     unsigned int       low_run = 0;
274     unsigned int       index   = dir == forward ? 0 : start.k() - 1;
275     std::string        seq;
276     std::set<mer_dna>  set; // Current set of used mers to avoid endless loop
277 
278     insert_canonical(set, *current);
279 
280     while(true) {
281       insert_canonical(used_mers_, *current);
282       //if(!insert_canonical(set, *current))
283         //break; // loop. Don't output anything  //AZ modified to avoid loss of k-unitigs from loops
284       if(!next_mer(dir, *current, *cont, &count))
285         break;
286       //printf("Count next fwd %d mer %s\n",count,(current->to_str()).c_str());
287       if(!next_mer(rev_direction(dir), *cont, mer3))
288         break;
289       //printf("Count next rev %d mer %s\n",count,(cont->to_str()).c_str());
290       //printf("Mer3 %s\n",(mer3.to_str()).c_str());
291       // This can happen (only) with continuation on low. It does not
292       // create a branch as far as next_mer is concerned if one low
293       // count and one high count, but it still a branch in this case:
294       // there are two way to go through that region
295       if(mer3 != *current)
296         break;
297 
298       if(!insert_canonical(set, *cont)) //AZ we hit a loop -- do not extend
299           break;
300 
301       seq += (char)cont->base(index);
302       //printf("Seq cont is %s low_run is %d\n",seq.c_str(),low_run);
303       if(count < args_.quality_threshold_arg) {
304         if(++low_run > args_.cont_on_low_arg)
305           break;
306       } else
307         low_run = 0;
308 
309       std::swap(current, cont);
310     }
311     //printf("Done %s\n",seq.c_str());
312     // Erase trailing low quality bases if any and reset current to be
313     // the actual last k-mer (with only good quality bases). Needed
314     // for the test of already written k-unitigs to be accurate.
315     if(low_run > 0) {
316       seq.erase(seq.size() - std::min((unsigned int)seq.size(), low_run));
317       if(seq.size() >= current->k()) {
318         if(dir == forward) {
319           *current = seq.substr(seq.size() - current->k());
320         } else {
321           std::string end = seq.substr(seq.size() - current->k());
322           std::string rev(end.rbegin(), end.rend());
323           *current = rev;
324         }
325       } else {
326         // Sequence does not contain a full k-mer. Need to recreate it
327         // from the starting k-mer and seq by shifting.
328         *current = start;
329         for(auto it = seq.begin(); it != seq.end(); ++it)
330           if(dir == forward) {
331             current->shift_left(*it);
332           } else {
333             current->shift_right(*it);
334           }
335       }
336     }
337 
338     // If the last k-mer has been used in a k-unitigs already, this
339     // means two threads are working on the same unitigs, starting
340     // from opposite ends. Output only if the current thread has the
341     // "largest" end k-mer.
342     bool end_new = insert_canonical(end_points_, *current);
343     if(!end_new) {
344       if(start.get_canonical() < current->get_canonical())
345         return;
346     }
347     // Output results
348     if(start.k() + seq.length() < args_.min_len_arg)
349       return;
350     uint64_t id = (unitig_id_ += 1) - 1;
351     output << ">" << id << " length:" << (start.k() + seq.size()) << "\n";
352     if(dir == backward) {
353       std::string reversed(seq.rbegin(), seq.rend());
354       output << reversed << start << "\n";
355     } else {
356       output << start << seq << "\n";
357     }
358     output << jflib::endr;
359   }
360 };
361 
362 #endif /* _CREATE_K_UNITIGS_COMMON_H_ */
363