1 /* SuperRead pipeline
2 * Copyright (C) 2012 Genome group at University of Maryland.
3 *
4 * This program is free software: you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License as
6 * published by the Free Software Foundation, either version 3 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #ifndef _CREATE_K_UNITIGS_COMMON_H_
19 #define _CREATE_K_UNITIGS_COMMON_H_
20
21 #include <jellyfish/thread_exec.hpp>
22 #include <jellyfish/mer_dna.hpp>
23 #include <jellyfish/mer_iterator.hpp>
24 #include <jellyfish/stream_manager.hpp>
25 #include <jellyfish/mer_overlap_sequence_parser.hpp>
26 #include <jellyfish/mer_dna_bloom_counter.hpp>
27
28 #include <jflib/multiplexed_io.hpp>
29 #include <jellyfish/atomic_field.hpp>
30
31 using jellyfish::mer_dna;
32 using jellyfish::thread_exec;
33 using jellyfish::mer_dna_bloom_counter;
34 using jellyfish::mer_dna_bloom_filter;
35
36 // Wrapper around mer_iterator to satisfy common interface
37 typedef std::vector<const char*> file_vector;
38 typedef jellyfish::stream_manager<file_vector::const_iterator> stream_manager;
39 typedef jellyfish::mer_overlap_sequence_parser<stream_manager> sequence_parser;
40 typedef jellyfish::mer_iterator<sequence_parser, mer_dna> mer_iterator;
41 class read_mers {
42 mer_iterator stream_;
43
44 public:
read_mers(sequence_parser & parser,int id)45 read_mers(sequence_parser& parser, int id) : stream_(parser, true) { }
operator bool() const46 operator bool() const { return (void*)stream_ != 0; }
operator ->() const47 const mer_dna* operator->() const { return stream_.operator->(); }
operator *() const48 const mer_dna& operator*() const { return stream_.operator*(); }
operator ++()49 read_mers& operator++() {
50 ++stream_;
51 return *this;
52 }
53 };
54
55 // Wrapper around bloom filter class to have set compatible insert
56 // method.
57 class mer_bloom {
58 mer_dna_bloom_filter bf_;
59
60 public:
mer_bloom(double fp,size_t size)61 mer_bloom(double fp, size_t size) : bf_(fp, size) { }
insert(const mer_dna & m)62 std::pair<unsigned int, bool> insert(const mer_dna& m) {
63 unsigned int r = bf_.insert(m);
64 return std::make_pair(r, r == 0);
65 }
66 };
67
68
69 // Insert a mer in a set and return true if the k-mer is new in the
70 // set.
71 template<typename set_type>
insert_canonical(set_type & set,const mer_dna & mer)72 bool insert_canonical(set_type& set, const mer_dna& mer) {
73 return set.insert(mer.get_canonical()).second;
74 }
75
76 /* Read k-mers and store them in a map. The map_type must have the
77 operator[]. The content returned must have the prefix ++. All this
78 needs to be multi-thread safe.
79 */
80 template<typename map_type, typename parser_type, typename stream_type>
81 class populate_mer_set : public thread_exec {
82 int mer_len_;
83 parser_type& parser_;
84 map_type& set_;
85
86 public:
populate_mer_set(int mer_len,map_type & set,parser_type & parser)87 populate_mer_set(int mer_len, map_type& set, parser_type& parser) :
88 mer_len_(mer_len), parser_(parser), set_(set)
89 { }
90
start(int thid)91 void start(int thid) {
92 for(stream_type stream(parser_, thid); stream; ++stream)
93 ++set_[*stream];
94 set_.done();
95 }
96 };
97
98 /* - mer_counts_type maps k-mer to counts. Has operator[] returning
99 the count.
100
101 - used_type is a set type with operator insert. (set compatible)
102 - end_points_type is a set type with operator insert. (set compatible)
103 - parser_type is the type to generate iterator of k-mers
104 - stream_type is the k-mer iteratorp type
105 - args_type is the type of switches on command line.
106 */
107 template<typename mer_counts_type, typename used_type, typename end_points_type,
108 typename parser_type, typename stream_type,
109 typename args_type>
110 class create_k_unitig : public jellyfish::thread_exec {
111 const mer_counts_type& counts_; // Counts for k-mers
112 used_type& used_mers_; // Mark all k-mers whether they have been visited already
113 end_points_type& end_points_; // End points of k-unitigs, to ouput only once
114 int threads_;
115 parser_type& parser_;
116 jflib::o_multiplexer output_multiplexer_;
117 jflib::atomic_field<uint64_t> unitig_id_;
118 const args_type& args_;
119
120 enum direction { forward = 1, backward = -1 };
rev_direction(direction dir)121 static direction rev_direction(direction dir) { return (direction)-dir; }
122
123 public:
create_k_unitig(const mer_counts_type & counts,used_type & used,end_points_type & ends,int threads,parser_type & parser,std::ostream & output,const args_type & args)124 create_k_unitig(const mer_counts_type& counts, used_type& used, end_points_type& ends,
125 int threads, parser_type& parser, std::ostream& output,
126 const args_type& args) :
127 counts_(counts),
128 used_mers_(used),
129 end_points_(ends),
130 threads_(threads),
131 parser_(parser),
132 output_multiplexer_(&output, 3 * threads, 4096),
133 unitig_id_(0),
134 args_(args)
135 { }
136
start(int thid)137 virtual void start(int thid) {
138 // mer_stream<mer_dna, read_parser> stream(mer_dna::k(), parser_);
139 stream_type stream(parser_, thid);
140 jflib::omstream output(output_multiplexer_);
141 mer_dna current;
142 mer_dna continuation;
143 mer_dna tmp;
144
145 for( ; stream; ++stream) {
146 auto is_new = used_mers_.insert(*stream);
147 if(!is_new.second)
148 continue;
149 // Never start a unitig on low count
150 if(counts_[*stream] < args_.quality_threshold_arg)
151 continue;
152 current = *stream;
153 //printf("Current mer %s\n",(current.to_str()).c_str());
154 // Grow unitig if a starting (branching) mer
155 if(starting_mer(forward, current)) {
156 //printf("Starting forward\n");
157 grow_unitig(backward, current, output);
158 } else if(starting_mer(backward, current)) {
159 //printf("Starting reverse\n");
160 grow_unitig(forward, current, output);
161 }
162 // Unique continuation on both sides -> middle of k-unitig: do nothing
163 }
164 }
165
166 private:
167 // Check all k-mers extending in one direction. If unique
168 // continuation, store it in cont and return true. A unique
169 // continuation may have a count less than the min quality
170 // threshold, as will be reported in the count output argument.
171 //
172 // On the other hand, continuation with count less than the min
173 // quality threshold do not create a branch compared to a high
174 // quality continuation. I.e., if the threshold is 2, and the counts
175 // are as follow:
176 //
177 // 0, 1, 0, 0 -> unique continuation, count of 1
178 // 0, 1, 0, 1 -> no unique continuation
179 // 2, 0, 0, 0 -> unique continuation, count of 2
180 // 2, 1, 0, 0 -> unique continuation, count of 2
181 // 2, 0, 3, 0 -> no unique continuation
182 //
183 // Otherwise return false and the value of cont is undetermined. If
184 // true is returned and count is not NULL, the count of the unique
185 // continuation mer is stored in the pointer.
next_mer(const direction dir,const mer_dna & start,mer_dna & cont,unsigned int * count=0)186 bool next_mer(const direction dir, const mer_dna& start, mer_dna& cont,
187 unsigned int* count = 0) {
188 int index;
189 mer_dna cont_comp(start);
190 cont_comp.reverse_complement();
191 cont = start;
192
193 if(dir == forward) {
194 cont.shift_left(0);
195 cont_comp.shift_right(0);
196 index = 0;
197 } else {
198 cont.shift_right(0);
199 cont_comp.shift_left(0);
200 index = cont.k() - 1;
201 }
202 auto base = cont.base(index); // Point to first or last base. Correct base to change
203 auto base_comp = cont_comp.base(cont.k() - 1 - index);
204
205 int nb_cont = 0, nb_low_cont = 0;
206 int code = 0, low_code = 0;
207 unsigned int cont_count = 0, low_cont_count = 0;
208 for(int i = 0; i < 4; ++i) {
209 base = i;
210 base_comp = mer_dna::complement(i);
211
212 unsigned int cont_count_ = counts_[cont < cont_comp ? cont : cont_comp];
213 if(cont_count_ >= args_.quality_threshold_arg) {
214 if(++nb_cont > 1)
215 return false;
216 code = i;
217 cont_count = cont_count_;
218 } else if(cont_count_ > 0) {
219 ++nb_low_cont;
220 low_code = i;
221 low_cont_count = cont_count_;
222 }
223 }
224
225 if(nb_cont == 1) {
226 base = code;
227 if(count)
228 *count = cont_count;
229 return true;
230 } else if(nb_cont == 0 && nb_low_cont == 1) {
231 base = low_code;
232 if(count)
233 *count = low_cont_count;
234 return true;
235 }
236 return false;
237 }
238
239 // Return true if m is a starting mer in the given dir: a mer is a
240 // starting mer if it is branching forward, or backward, or dries
241 // out, maybe after some number of low count mer to skip.
starting_mer(direction dir,mer_dna m)242 bool starting_mer(direction dir, mer_dna m) {
243 int low_cont = args_.cont_on_low_arg;
244 mer_dna tmp1, tmp2;
245
246 while(true) {
247 unsigned int count = 0;
248 if(!next_mer(dir, m, tmp1, &count))
249 return true;
250 if(count >= args_.quality_threshold_arg) {
251 if(!next_mer(rev_direction(dir), tmp1, tmp2, &count))
252 return true;
253 break;
254 }
255 if(--low_cont < 0)
256 return true;
257 m = tmp1;
258 }
259 return false;
260 }
261
grow_unitig(const direction dir,const mer_dna & start,jflib::omstream & output)262 void grow_unitig(const direction dir, const mer_dna& start, jflib::omstream& output) {
263 bool start_new = insert_canonical(end_points_, start);
264 if(!start_new)
265 return;
266 //printf("Starting unitig from %s\n",(start.to_str()).c_str());
267 mer_dna mer1(start);
268 mer_dna mer2;
269 mer_dna mer3;
270 mer_dna *current = &mer1;
271 mer_dna *cont = &mer2;
272 unsigned int count = 0;
273 unsigned int low_run = 0;
274 unsigned int index = dir == forward ? 0 : start.k() - 1;
275 std::string seq;
276 std::set<mer_dna> set; // Current set of used mers to avoid endless loop
277
278 insert_canonical(set, *current);
279
280 while(true) {
281 insert_canonical(used_mers_, *current);
282 //if(!insert_canonical(set, *current))
283 //break; // loop. Don't output anything //AZ modified to avoid loss of k-unitigs from loops
284 if(!next_mer(dir, *current, *cont, &count))
285 break;
286 //printf("Count next fwd %d mer %s\n",count,(current->to_str()).c_str());
287 if(!next_mer(rev_direction(dir), *cont, mer3))
288 break;
289 //printf("Count next rev %d mer %s\n",count,(cont->to_str()).c_str());
290 //printf("Mer3 %s\n",(mer3.to_str()).c_str());
291 // This can happen (only) with continuation on low. It does not
292 // create a branch as far as next_mer is concerned if one low
293 // count and one high count, but it still a branch in this case:
294 // there are two way to go through that region
295 if(mer3 != *current)
296 break;
297
298 if(!insert_canonical(set, *cont)) //AZ we hit a loop -- do not extend
299 break;
300
301 seq += (char)cont->base(index);
302 //printf("Seq cont is %s low_run is %d\n",seq.c_str(),low_run);
303 if(count < args_.quality_threshold_arg) {
304 if(++low_run > args_.cont_on_low_arg)
305 break;
306 } else
307 low_run = 0;
308
309 std::swap(current, cont);
310 }
311 //printf("Done %s\n",seq.c_str());
312 // Erase trailing low quality bases if any and reset current to be
313 // the actual last k-mer (with only good quality bases). Needed
314 // for the test of already written k-unitigs to be accurate.
315 if(low_run > 0) {
316 seq.erase(seq.size() - std::min((unsigned int)seq.size(), low_run));
317 if(seq.size() >= current->k()) {
318 if(dir == forward) {
319 *current = seq.substr(seq.size() - current->k());
320 } else {
321 std::string end = seq.substr(seq.size() - current->k());
322 std::string rev(end.rbegin(), end.rend());
323 *current = rev;
324 }
325 } else {
326 // Sequence does not contain a full k-mer. Need to recreate it
327 // from the starting k-mer and seq by shifting.
328 *current = start;
329 for(auto it = seq.begin(); it != seq.end(); ++it)
330 if(dir == forward) {
331 current->shift_left(*it);
332 } else {
333 current->shift_right(*it);
334 }
335 }
336 }
337
338 // If the last k-mer has been used in a k-unitigs already, this
339 // means two threads are working on the same unitigs, starting
340 // from opposite ends. Output only if the current thread has the
341 // "largest" end k-mer.
342 bool end_new = insert_canonical(end_points_, *current);
343 if(!end_new) {
344 if(start.get_canonical() < current->get_canonical())
345 return;
346 }
347 // Output results
348 if(start.k() + seq.length() < args_.min_len_arg)
349 return;
350 uint64_t id = (unitig_id_ += 1) - 1;
351 output << ">" << id << " length:" << (start.k() + seq.size()) << "\n";
352 if(dir == backward) {
353 std::string reversed(seq.rbegin(), seq.rend());
354 output << reversed << start << "\n";
355 } else {
356 output << start << seq << "\n";
357 }
358 output << jflib::endr;
359 }
360 };
361
362 #endif /* _CREATE_K_UNITIGS_COMMON_H_ */
363