1 /* SuperRead pipeline
2 * Copyright (C) 2012 Genome group at University of Maryland.
3 *
4 * This program is free software: you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License as
6 * published by the Free Software Foundation, either version 3 of the
7 * License, or (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18
19 #include <vector>
20 #include <memory>
21 #include <limits>
22 #include <cmath>
23 #include <cstdlib>
24
25 #include <jellyfish/atomic_gcc.hpp>
26 #include <jellyfish/stream_manager.hpp>
27 #include <jellyfish/whole_sequence_parser.hpp>
28 #include <jellyfish/mer_dna.hpp>
29 #include <jellyfish/jellyfish.hpp>
30
31 #include <jflib/multiplexed_io.hpp>
32 #include <gzip_stream.hpp>
33
34 #include <src/mer_database.hpp>
35 #include <src/error_correct_reads.hpp>
36 #include <src/error_correct_reads_cmdline.hpp>
37 #include <src/verbose_log.hpp>
38
39 namespace err = jellyfish::err;
40
41 using jellyfish::mer_dna;
42 typedef std::vector<const char*> file_vector;
43 typedef jellyfish::stream_manager<file_vector::const_iterator> stream_manager;
44 typedef jellyfish::whole_sequence_parser<stream_manager> read_parser;
45
46
47 typedef uint64_t hkey_t;
48 typedef uint64_t hval_t;
49
50 static args_t args;
51
52 // Poisson term. Computes a term of the Poisson distribution: e^{-\lambda} \lambda^i / i!.
poisson_term(double lambda,unsigned int i)53 double poisson_term(double lambda, unsigned int i) {
54 static const double facts[11] = { 1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800 };
55 static const double tau = 6.283185307179583;
56
57 if(i < 11)
58 return exp(-lambda) * pow(lambda, i) / facts[i];
59 else
60 return exp(-lambda + i) * pow(lambda / i, i) / sqrt(tau * i);
61 }
62
63 // Contaminant database. If a Jellyfish database is given, return true
64 // iff the k-mer is in the database. With no database, it always
65 // returns false.
66 class contaminant_check {
67 public:
~contaminant_check()68 virtual ~contaminant_check() { }
69
70 virtual bool is_contaminant(const mer_dna& m) = 0;
71 virtual bool is_contaminant(const mer_dna& m, mer_dna& tmp) = 0;
72 virtual void debug(const char*msg) = 0;
73 };
74
75 class contaminant_no_database : public contaminant_check {
76 public:
~contaminant_no_database()77 virtual ~contaminant_no_database() { }
is_contaminant(const mer_dna & m)78 virtual bool is_contaminant(const mer_dna& m) { return false; }
is_contaminant(const mer_dna & m,mer_dna & tmp)79 virtual bool is_contaminant(const mer_dna& m, mer_dna& tmp) { return false; }
debug(const char * msg)80 virtual void debug(const char* msg) { std::cerr << msg << " no database" << std::endl; }
81 };
82
83 class contaminant_database : public contaminant_check {
84 mer_array ary_;
85 public:
contaminant_database(binary_reader & reader,size_t size)86 contaminant_database(binary_reader& reader, size_t size) : ary_(size, mer_dna::k() * 2, 0, 126) {
87 while(reader.next()) {
88 if(!ary_.set(reader.key()))
89 throw std::runtime_error("Size of hash for contaminant too small");
90 }
91 }
~contaminant_database()92 virtual ~contaminant_database() { }
debug(const char * msg)93 virtual void debug(const char* msg) { std::cerr << msg << std::endl; }
is_contaminant(const mer_dna & m)94 virtual bool is_contaminant(const mer_dna& m) { return ary_.has_key(m); }
is_contaminant(const mer_dna & m,mer_dna & tmp)95 virtual bool is_contaminant(const mer_dna& m, mer_dna& tmp) {
96 size_t id;
97 return ary_.get_key_id(m, &id, tmp);
98 }
99 };
100
101 template<class instance_t>
102 class error_correct_t : public jellyfish::thread_exec {
103 read_parser _parser;
104 int _skip;
105 int _good;
106 int _anchor;
107 std::string _prefix;
108 int _min_count;
109 int _cutoff;
110 char _qual_cutoff;
111 int _window;
112 int _error;
113 bool _gzip;
114 const database_query* _mer_database;
115 contaminant_check* _contaminant;
116 bool _trim_contaminant;
117 int _homo_trim;
118 double _collision_prob; // collision probability = a priori error rate / 3
119 double _poisson_threshold;
120 bool _no_discard;
121
122 jflib::o_multiplexer * _output;
123 jflib::o_multiplexer * _log;
124 public:
error_correct_t(int nb_threads,stream_manager & streams)125 error_correct_t(int nb_threads, stream_manager& streams) :
126 _parser(4 * nb_threads, 100, 1, streams),
127 _skip(0), _good(1), _min_count(1), _cutoff(4), _window(0), _error(0), _gzip(false),
128 _mer_database(0), _contaminant(0), _trim_contaminant(false),
129 _homo_trim(std::numeric_limits<int>::min()), _no_discard(false) { }
130
131 private:
132 // Open the data (error corrected reads) and log files. Default to
133 // STDOUT and STDERR if none specified.
open_file(const std::string prefix,const char * suffix,const std::string def)134 std::ostream* open_file(const std::string prefix, const char* suffix,
135 const std::string def) {
136 std::ostream* res;
137 std::string file;
138
139 if(prefix.empty())
140 file = def;
141 else {
142 file = prefix;
143 file += suffix;
144 }
145 if(_gzip) {
146 if(!prefix.empty())
147 file += ".gz";
148 res = new gzipstream(file.c_str());
149 } else
150 res = new std::ofstream(file.c_str());
151 if(!res->good())
152 throw std::runtime_error(err::msg() << "Failed to open file '" << file << "'" << err::no);
153 res->exceptions(std::ios::eofbit|std::ios::failbit|std::ios::badbit);
154 return res;
155 }
156
157 public:
do_it(int nb_threads)158 void do_it(int nb_threads) {
159 // Make sure they are deleted when done
160 std::unique_ptr<std::ostream> details(open_file(_prefix, ".log", "/dev/fd/2"));
161 std::unique_ptr<std::ostream> output(open_file(_prefix, ".fa", "/dev/fd/1"));
162 // Multiplexers, same thing
163 std::unique_ptr<jflib::o_multiplexer>
164 log_m(new jflib::o_multiplexer(details.get(), 3 * nb_threads, 1024));
165 std::unique_ptr<jflib::o_multiplexer>
166 output_m(new jflib::o_multiplexer(output.get(), 3 * nb_threads, 1024));
167 _log = log_m.get();
168 _output = output_m.get();
169
170 exec_join(nb_threads);
171 }
172
start(int id)173 virtual void start(int id) {
174 instance_t(*this, id).start();
175 }
176
skip(int s)177 error_correct_t& skip(int s) { _skip = s; return *this; }
good(int g)178 error_correct_t& good(int g) { _good = g; return *this; }
anchor(int a)179 error_correct_t& anchor(int a) { _anchor = a; return *this; }
prefix(const char * s)180 error_correct_t& prefix(const char *s) { _prefix = s; return *this; }
prefix(const std::string s)181 error_correct_t& prefix(const std::string s) { _prefix = s; return *this; }
min_count(int m)182 error_correct_t& min_count(int m) { _min_count = m; return *this; }
cutoff(int p)183 error_correct_t& cutoff(int p) { _cutoff = p; return *this; }
qual_cutoff(char c)184 error_correct_t& qual_cutoff(char c) { _qual_cutoff = c; return *this; }
185 // error_corret_t & advance(int a) { _advance = a; return *this; }
window(int w)186 error_correct_t& window(int w) { _window = w; return *this; }
error(int e)187 error_correct_t& error(int e) { _error = e; return *this; }
gzip(bool g)188 error_correct_t& gzip(bool g) { _gzip = g; return *this; }
mer_database(database_query * q)189 error_correct_t& mer_database(database_query* q) { _mer_database = q; return *this; }
contaminant(contaminant_check * c)190 error_correct_t& contaminant(contaminant_check* c) { _contaminant = c; return *this; }
trim_contaminant(bool t)191 error_correct_t& trim_contaminant(bool t) { _trim_contaminant = t; return *this; }
homo_trim(int t)192 error_correct_t& homo_trim(int t) { _homo_trim = t; return *this; }
collision_prob(double cp)193 error_correct_t& collision_prob(double cp) { _collision_prob = cp; return *this; }
poisson_threshold(double t)194 error_correct_t& poisson_threshold(double t) { _poisson_threshold = t; return *this; }
no_discard(bool d)195 error_correct_t& no_discard(bool d) { _no_discard = d; return *this; }
196
parser()197 read_parser& parser() { return _parser; }
skip() const198 int skip() const { return _skip; }
good() const199 int good() const { return _good; }
anchor() const200 int anchor() const { return _anchor; }
prefix() const201 const std::string & prefix() const { return _prefix; }
min_count() const202 int min_count() const { return _min_count; }
cutoff() const203 int cutoff() const { return _cutoff; }
qual_cutoff() const204 char qual_cutoff() const { return _qual_cutoff; }
205 // int advance() const { return _advance; }
window() const206 int window() const { return _window ? _window : mer_dna::k(); }
error() const207 int error() const { return _error ? _error : mer_dna::k() / 2; }
gzip() const208 bool gzip() const { return _gzip; }
mer_database() const209 const database_query* mer_database() const { return _mer_database; }
contaminant() const210 contaminant_check* contaminant() const { return _contaminant; }
trim_contaminant() const211 bool trim_contaminant() const { return _trim_contaminant; }
do_homo_trim() const212 bool do_homo_trim() const { return _homo_trim != std::numeric_limits<int>::min(); }
homo_trim() const213 int homo_trim() const { return _homo_trim; }
collision_prob() const214 double collision_prob() const { return _collision_prob; }
poisson_threshold() const215 double poisson_threshold() const { return _poisson_threshold; }
no_discard() const216 bool no_discard() const { return _no_discard; }
217
output()218 jflib::o_multiplexer& output() { return *_output; }
log()219 jflib::o_multiplexer& log() { return *_log; }
220 };
221
222 class error_correct_instance {
223 public:
224 typedef error_correct_t<error_correct_instance> ec_t ;
225
226 private:
227 ec_t& _ec;
228 // int _id;
229 size_t _buff_size;
230 char* _buffer;
231 kmer_t _tmp_mer;
232 mer_dna _tmp_mer_dna;
233
234 static const char* error_contaminant;
235 static const char* error_no_starting_mer;
236 static const char* error_homopolymer;
237
238 public:
error_correct_instance(ec_t & ec,int id)239 error_correct_instance(ec_t& ec, int id) :
240 _ec(ec), _buff_size(0), _buffer(0) { }
241 // _ec(ec), _id(id), _buff_size(0), _buffer(0) { }
~error_correct_instance()242 ~error_correct_instance() {
243 free(_buffer);
244 }
245
start()246 void start() {
247 jflib::omstream output(_ec.output());
248 jflib::omstream details(_ec.log());
249 kmer_t mer, tmer;
250
251 uint64_t nb_reads = 0;
252 while(true) {
253 read_parser::job job(_ec.parser());
254 if(job.is_empty()) break;
255 for(size_t i = 0; i < job->nb_filled; ++i) {
256 if(i % 2 == 0)
257 output << jflib::endr;
258 const std::string& header = job->data[i].header;
259 const std::string& sequence = job->data[i].seq;
260 const char* const seq_s = sequence.c_str();
261 const char* const seq_e = seq_s + sequence.size();
262 const char* const qual_s = job->data[i].qual.c_str();
263
264 nb_reads++;
265 insure_length_buffer(sequence.size());
266
267 const char* error = "";
268 const char *input = seq_s + _ec.skip();
269 char *out = _buffer + _ec.skip();
270 //Prime system. Find and write starting k-mer
271 if(!find_starting_mer(mer, input, seq_e, out, &error)) {
272 details << "Skipped " << header << ": " << error << "\n";
273 details << jflib::endr;
274 if(_ec.no_discard())
275 output << ">" << header << "\nN\n";
276 continue;
277 }
278 // Extend forward and backward
279 tmer = mer;
280 const ssize_t start_off = input - seq_s;
281 forward_log fwd_log(_ec.window(), _ec.error());
282 char *end_out =
283 extend(forward_mer(tmer),
284 forward_ptr<const char>(input),
285 forward_ptr<const char>(qual_s + start_off),
286 forward_counter(start_off),
287 forward_ptr<const char>(seq_e),
288 forward_ptr<char>(out), fwd_log,
289 &error);
290 if(!end_out) {
291 details << "Skipped " << header << ": " << error << "\n";
292 details << jflib::endr;
293 if(_ec.no_discard())
294 output << ">" << header << "\nN\n";
295 continue;
296 }
297 assert(input > seq_s + mer_dna::k());
298 assert(out > _buffer + mer_dna::k());
299 assert(input - seq_s == out - _buffer);
300 tmer = mer;
301 backward_log bwd_log(_ec.window(), _ec.error());
302 char *start_out =
303 extend(backward_mer(tmer),
304 backward_ptr<const char>(input - mer_dna::k() - 1),
305 backward_ptr<const char>(qual_s + start_off - mer_dna::k() - 1),
306 backward_counter(start_off - mer_dna::k() - 1),
307 backward_ptr<const char>(seq_s - 1),
308 backward_ptr<char>(out - mer_dna::k() - 1), bwd_log,
309 &error);
310 if(!start_out) {
311 details << "Skipped " << header << ": " << error << "\n";
312 details << jflib::endr;
313 if(_ec.no_discard())
314 output << ">" << header << "\nN\n";
315 continue;
316 }
317 start_out++;
318 assert(start_out >= _buffer);
319 assert(_buffer + _buff_size >= end_out);
320
321 if(_ec.do_homo_trim()) {
322 end_out = homo_trim(_buffer, start_out, end_out, fwd_log, bwd_log, &error);
323 if(!end_out) {
324 details << "Skipped " << header << ": " << error << "\n";
325 details << jflib::endr;
326 if(_ec.no_discard())
327 output << ">" << header << "\nN\n";
328 continue;
329 }
330 }
331 assert(end_out >= _buffer);
332 assert(_buffer + _buff_size >= end_out);
333
334 output << ">" << header
335 << " " << fwd_log << " " << bwd_log << "\n"
336 << substr(start_out, end_out) << "\n";
337 } // for(size_t i... Loop over reads in job
338 } // while(true)... loop over all jobs
339 details.close();
340 output.close();
341 }
342
343 private:
344 enum log_code { OK, TRUNCATE, ERROR };
345
346 template<typename dir_mer, typename elog, typename counter>
check_contaminant(dir_mer & mer,elog & log,const counter & cpos,const char ** error)347 log_code check_contaminant(dir_mer& mer, elog& log, const counter& cpos, const char**error) {
348 if(_ec.contaminant()->is_contaminant(mer.canonical(), _tmp_mer_dna)) {
349 if(_ec.trim_contaminant()) {
350 log.truncation(cpos);
351 return TRUNCATE;
352 }
353 *error = error_contaminant;
354 return ERROR;
355 }
356 return OK;
357 }
358
359 // Log a substitution.
360 template<typename dir_mer, typename out_dir_ptr, typename elog, typename counter>
log_substitution(dir_mer & mer,out_dir_ptr & out,elog & log,const counter & cpos,int from,int to,const char ** error)361 log_code log_substitution(dir_mer& mer, out_dir_ptr& out, elog& log, const counter& cpos, int from, int to,
362 const char**error) {
363 if(from == to)
364 return OK;
365
366 mer.replace(0, to);
367 switch(log_code c = check_contaminant(mer, log, cpos, error)) {
368 case OK: break;
369 default: return c;
370 }
371
372 if(log.substitution(cpos, from >= 0 ? mer_dna::rev_code(from) : 'N', to >= 0 ? mer_dna::rev_code(to) : 'N')) {
373 int diff = log.remove_last_window();
374 out = out - diff;
375 log.truncation(cpos - diff);
376 return TRUNCATE;
377 }
378 return OK;
379 }
380
381 // Extend and correct read. Copy from input to out. mer should be
382 // represent a "good" starting k-mer at the input position.
383 // out point to the next character to be written.
384 template<typename dir_mer, typename in_dir_ptr, typename out_dir_ptr,
385 typename counter, typename elog>
extend(dir_mer mer,in_dir_ptr input,in_dir_ptr qual,counter pos,in_dir_ptr end,out_dir_ptr out,elog & log,const char ** error)386 char * extend(dir_mer mer, in_dir_ptr input, in_dir_ptr qual,
387 counter pos, in_dir_ptr end,
388 out_dir_ptr out, elog &log, const char** error) {
389 counter cpos = pos;
390 uint32_t prev_count = _ec.mer_database()->get_val(mer.canonical());
391
392 for( ; input < end; ++input, ++qual) {
393 const char base = *input;
394
395 if(base == '\n')
396 continue;
397 cpos = pos;
398 ++pos;
399
400 const int ori_code = mer_dna::code(base);
401 mer.shift(ori_code >= 0 ? ori_code : 0);
402 if(ori_code >= 0) {
403 switch(check_contaminant(mer, log, cpos, error)) {
404 case OK: break;
405 case TRUNCATE: goto done;
406 case ERROR: return 0;
407 }
408 }
409 uint64_t counts[4];
410 int ucode = 0;
411 int level;
412
413 const int count = _ec.mer_database()->get_best_alternatives(mer, counts, ucode, level);
414
415 // No coninuation whatsoever, trim.
416 if(count == 0) {
417 log.truncation(cpos);
418 goto done;
419 }
420
421 if(count == 1) { // One continuation. Is it an error?
422 prev_count = counts[ucode];
423 switch(log_substitution(mer, out, log, cpos, ori_code, ucode, error)) {
424 case OK: break;
425 case TRUNCATE: goto done;
426 case ERROR: return 0;
427 }
428 *out++ = mer.base(0);
429 continue;
430 }
431 // We get here if there is more than one alternative base to try
432 // at some level. Note that if the current base is low quality
433 // and all alternatives are higher quality, then the current
434 // base will have a count of zero. If the current base is non N
435 // and has a high count or previous count is low enough that
436 // continuity does not apply, output current base. But if the current
437 // base has count of zero, all alternatives are low quality and prev_count is low
438 // then trim
439 if(ori_code >= 0){ //if the current base is valid base (non N)
440 if(counts[ori_code] > (uint64_t)_ec.min_count()) {
441 if(counts[ori_code] >= (uint32_t)_ec.cutoff() || *qual >= _ec.qual_cutoff()) {
442 *out++ = mer.base(0);
443 continue;
444 }
445 // Now we ask for a probability of getting
446 // counts[ori_code] errors with p=1/300 in sum_counts
447 // trials. If this probability is < 10e-6, do not correct
448 double p = (double)(counts[0] + counts[1] + counts[2] + counts[3]) * _ec.collision_prob();
449 const double prob = poisson_term(p, counts[ori_code]);
450 if(prob < _ec.poisson_threshold()) {
451 *out++ = mer.base(0);
452 continue;
453 }
454 } else if(level == 0 && counts[ori_code] == 0) {
455 // definitely an error and all alternatives are low quality
456 log.truncation(cpos);
457 goto done;
458 }
459 } else if(level == 0) { //if all alternatives are low quality
460 log.truncation(cpos);
461 goto done;
462 }
463
464 // We get here if there are multiple possible substitutions, the
465 // original count is low enough and the previous count is high (good) or
466 // the current base is an N
467 // We find out all alternative bases
468 // that have a continuation at the same or better level. Then
469 // if the current base is N, pick the one with the highest
470 // count that is the most similar to the prev_count,
471 // otherwise pick the one with the most similar count.
472 // If no alternative continues, leave the base alone.
473 int check_code = ori_code;
474 bool success = false;
475 uint64_t cont_counts[4]; //here we record the counts for the continuations
476 bool continue_with_correct_base[4];
477 int read_nbase_code = -1;
478 bool candidate_continuations[4];
479 unsigned int ncandidate_continuations = 0;
480
481 //here we determine what the next base in the read is
482 if(input + 1 < end)
483 read_nbase_code = mer_dna::code(*(input + 1));
484
485 for(int i = 0; i < 4; ++i) {
486 cont_counts[i] = 0;
487 continue_with_correct_base[i] = false;
488 if(counts[i] <= (uint64_t)_ec.min_count())
489 continue;
490 check_code = i;
491 // Check that it continues at least one more base with that quality
492 _tmp_mer = mer.kmer();
493 dir_mer nmer = _tmp_mer;
494 nmer.replace(0, check_code);
495 // Does not matter what we shift, check all alternative anyway.
496 nmer.shift(0);
497
498 uint64_t ncounts[4];
499 int nucode = 0;
500 int nlevel;
501 const int ncount = _ec.mer_database()->get_best_alternatives(nmer, ncounts, nucode, nlevel);
502 if(ncount > 0 && nlevel >= level) {
503 continue_with_correct_base[i] = read_nbase_code >= 0 && ncounts[read_nbase_code] > 0;
504 success = true;
505 cont_counts[i] = counts[i];
506 }
507 }
508
509 if(success) {
510 // We found at least one alternative base that continues now
511 // we look for all alernatives that have a count closest to
512 // the previous count first we determine the count that is the
513 // closest to the current count but in the special case of
514 // prev_count == 1 we simply pick the largest count
515 check_code = -1;
516 uint32_t _prev_count = prev_count<=(uint64_t)_ec.min_count() ? std::numeric_limits<uint32_t>::max() : prev_count;
517 int min_diff = std::numeric_limits<int>::max();
518 for(int i = 0; i < 4; ++i) {
519 candidate_continuations[i] = false;
520 if(cont_counts[i] > 0)
521 min_diff = std::min(min_diff, (int)std::abs((long)cont_counts[i] - (long)_prev_count));
522 }
523
524 //we now know the count that is the closest, now we determine how many alternatives have this count
525 for(uint32_t i = 0; i < 4; i++) {
526 if(std::abs((long)cont_counts[i] - (long)_prev_count) == min_diff){
527 candidate_continuations[i] = true;
528 ++ncandidate_continuations;
529 check_code=i;
530 }
531 }
532
533 //do we have more than one good candidate? if we do then check which one continues with the correct base
534 if(ncandidate_continuations>1 && read_nbase_code >= 0)
535 for(int i = 0; i < 4; ++i){
536 if(candidate_continuations[i]){
537 if(!continue_with_correct_base[i])
538 --ncandidate_continuations;
539 else
540 check_code = i;
541 }
542 }
543
544 //fail if we still have more than one candidate
545 if(ncandidate_continuations != 1)
546 check_code = -1;
547
548 if(check_code >= 0) {
549 switch(log_substitution(mer, out, log, cpos, ori_code, check_code, error)) {
550 case OK: break;
551 case TRUNCATE: goto done;
552 case ERROR: return 0;
553 }
554 }
555 }
556 if(ori_code < 0 && check_code < 0) {// if invalid base and no good sub found
557 log.truncation(cpos);
558 goto done;
559 }
560 *out++ = mer.base(0);
561 }
562
563 done:
564 return out.ptr();
565 }
566
homo_trim(const char * start,char * out_start,char * out_end,forward_log & fwd_log,backward_log & bwd_log,const char ** error)567 char* homo_trim(const char* start, char* out_start, char* out_end,
568 forward_log& fwd_log, backward_log& bwd_log, const char** error) {
569 int max_homo_score = std::numeric_limits<int>::min();
570 char* max_pos = 0;
571 int homo_score = 0;
572 char* ptr = out_end - 1;
573 char pbase = mer_dna::code(*ptr);
574
575 for(--ptr; ptr >= out_start; --ptr) {
576 char cbase = mer_dna::code(*ptr);
577 homo_score += ((pbase == cbase) << 1) - 1; // Add 1 if same as last, -1 if not
578 pbase = cbase;
579 if(homo_score > max_homo_score) {
580 max_homo_score = homo_score;
581 max_pos = ptr;
582 }
583 }
584
585 if(max_homo_score < _ec.homo_trim())
586 return out_end; // Not a high score -> return without truncation
587 // assert(max_pos >= out_start);
588 // assert(max_pos >= start);
589 if(max_pos < out_start) {
590 *error = error_homopolymer;
591 return 0;
592 }
593 fwd_log.force_truncate(forward_counter(max_pos - start));
594 bwd_log.force_truncate(backward_counter(max_pos - start));
595 fwd_log.truncation(forward_counter(max_pos - start));
596 return max_pos;
597 }
598
insure_length_buffer(size_t len)599 void insure_length_buffer(size_t len) {
600 if(len <= _buff_size)
601 return;
602
603 _buff_size = len > 2 * _buff_size ? len + 100 : 2 * _buff_size;
604 _buffer = (char *)realloc(_buffer, _buff_size);
605 if(!_buffer)
606 throw std::runtime_error(err::msg() << "Buffer allocation failed, size " << _buffer << err::no);
607 }
608
find_starting_mer(kmer_t & mer,const char * & input,const char * end,char * & out,const char ** error)609 bool find_starting_mer(kmer_t &mer, const char * &input, const char *end, char * &out,
610 const char** error) {
611 while(input < end) {
612 for(int i = 0; input < end && i < (int)mer_dna::k(); ++i) {
613 char base = *input++;
614 *out++ = base;
615 if(!mer.shift_left(base))
616 i = -1; // If an N, skip to next k-mer
617 }
618 int found = 0;
619 while(input < end) {
620 bool contaminated = _ec.contaminant()->is_contaminant(mer.canonical(), _tmp_mer_dna);
621 if(contaminated && !_ec.trim_contaminant()) {
622 *error = error_contaminant;
623 return false;
624 }
625
626 if(!contaminated) {
627 hval_t val = _ec.mer_database()->get_val(mer.canonical());
628
629 found = (int)val >= _ec.anchor() ? found + 1 : 0;
630 if(found >= _ec.good())
631 return true;
632 }
633
634 char base = *input++;
635 *out++ = base;
636 if(!mer.shift_left(base))
637 break;
638 }
639 }
640
641 *error = error_no_starting_mer;
642 return false;
643 }
644 };
645
646 const char* error_correct_instance::error_contaminant = "Contaminated read";
647 const char* error_correct_instance::error_no_starting_mer = "No high quality mer";
648 const char* error_correct_instance::error_homopolymer = "Entire read is an homopolymer";
649
compute_poisson_cutoff__(const val_array_raw & counts,double collision_prob,double poisson_threshold)650 unsigned int compute_poisson_cutoff__(const val_array_raw& counts, double collision_prob, double poisson_threshold) {
651 auto it_end = counts.end();
652 uint64_t distinct = 0;
653 uint64_t total = 0;
654 for(auto it = counts.begin(); it != it_end; ++it) {
655 if((*it & 0x1) && (*it >= 2)) {
656 distinct += 1;
657 total += *it >> 1;
658 }
659 }
660 const double coverage = (double)total / (double)distinct;
661 vlog << "distinct mers:" << distinct << " total mers:" << total << " estimated coverage:" << coverage;
662 const double lambda = coverage * collision_prob;
663 vlog << "lambda:" << lambda << " collision_prob:" << collision_prob << " poisson_threshold:" << poisson_threshold;
664 for(unsigned int x = 2; x < 1000; ++x)
665 if(poisson_term(lambda, x) < poisson_threshold)
666 return x + 1;
667 return 0;
668 }
669
compute_poisson_cutoff(const val_array_raw & counts,double collision_prob,double poisson_threshold)670 unsigned int compute_poisson_cutoff(const val_array_raw& counts, double collision_prob, double poisson_threshold) {
671 vlog << "Computing Poisson cutoff";
672 unsigned int res = compute_poisson_cutoff__(counts, collision_prob, poisson_threshold);
673 return res;
674 }
675
main(int argc,char * argv[])676 int main(int argc, char *argv[])
677 {
678 args.parse(argc, argv);
679
680 if(args.qual_cutoff_char_given && args.qual_cutoff_char_arg.size() != 1)
681 args_t::error("The qual-cutoff-char must be one ASCII character.");
682 if(args.qual_cutoff_value_given && args.qual_cutoff_value_arg > (uint32_t)std::numeric_limits<char>::max())
683 args_t::error("The qual-cutoff-value must be in the range 0-127.");
684 const char qual_cutoff = args.qual_cutoff_char_given ? args.qual_cutoff_char_arg[0] :
685 (args.qual_cutoff_value_given ? (char)args.qual_cutoff_value_arg : std::numeric_limits<char>::max());
686
687 verbose_log::verbose = args.verbose_flag;
688 vlog << "Loading mer database";
689 database_query mer_database(args.db_arg, args.no_mmap_flag);
690 mer_dna::k(mer_database.header().key_len() / 2);
691
692 // Open contaminant database.
693 std::unique_ptr<contaminant_check> contaminant;
694 contaminant.reset(new contaminant_no_database());
695 if(args.contaminant_given) {
696 vlog << "Loading contaminant sequences";
697 std::ifstream contaminant_file(args.contaminant_arg);
698 if(!contaminant_file.good())
699 err::die(err::msg() << "Failed to open contaminant file '" << args.contaminant_arg << "'");
700 jellyfish::file_header header(contaminant_file);
701 if(header.format() != binary_dumper::format)
702 err::die(err::msg() << "Contaminant format expected '" << binary_dumper::format << "'");
703 if(mer_dna::k() * 2 != header.key_len())
704 err::die(err::msg() << "Contaminant mer length (" << (header.key_len() / 2)
705 << ") different than correction mer length (" << mer_dna::k() << ")");
706 binary_reader reader(contaminant_file, &header);
707 contaminant.reset(new contaminant_database(reader, header.size()));
708 }
709
710 stream_manager streams(args.sequence_arg.cbegin(), args.sequence_arg.cend(), 1);
711
712 const unsigned int cutoff = args.cutoff_given ?
713 args.cutoff_arg :
714 compute_poisson_cutoff(mer_database.vals(), args.apriori_error_rate_arg / 3,
715 args.poisson_threshold_arg / args.apriori_error_rate_arg);
716 vlog << "Using cutoff of " << cutoff;
717 if(cutoff == 0 && !args.cutoff_given)
718 err::die("Cutoff computation failed. Pass it explicitly with -p switch.");
719
720 error_correct_instance::ec_t correct(args.thread_arg, streams);
721 correct.skip(args.skip_arg).good(args.good_arg)
722 .anchor(args.anchor_count_arg)
723 .prefix(args.output_given ? (std::string)args.output_arg : "")
724 .min_count(args.min_count_arg)
725 .cutoff(cutoff)
726 .qual_cutoff(qual_cutoff)
727 .window(args.window_arg)
728 .error(args.error_arg)
729 .gzip(args.gzip_flag)
730 .mer_database(&mer_database)
731 .contaminant(contaminant.get())
732 .trim_contaminant(args.trim_contaminant_flag)
733 .homo_trim(args.homo_trim_given ? args.homo_trim_arg : std::numeric_limits<int>::min())
734 .collision_prob(args.apriori_error_rate_arg / 3)
735 .poisson_threshold(args.poisson_threshold_arg)
736 .no_discard(args.no_discard_flag);
737 vlog << "Correcting reads";
738 correct.do_it(args.thread_arg);
739 vlog << "Done";
740
741 return 0;
742 }
743
744