1 /* SuperRead pipeline
2  * Copyright (C) 2012  Genome group at University of Maryland.
3  *
4  * This program is free software: you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License as
6  * published by the Free Software Foundation, either version 3 of the
7  * License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 
19 #include <vector>
20 #include <memory>
21 #include <limits>
22 #include <cmath>
23 #include <cstdlib>
24 
25 #include <jellyfish/atomic_gcc.hpp>
26 #include <jellyfish/stream_manager.hpp>
27 #include <jellyfish/whole_sequence_parser.hpp>
28 #include <jellyfish/mer_dna.hpp>
29 #include <jellyfish/jellyfish.hpp>
30 
31 #include <jflib/multiplexed_io.hpp>
32 #include <gzip_stream.hpp>
33 
34 #include <src/mer_database.hpp>
35 #include <src/error_correct_reads.hpp>
36 #include <src/error_correct_reads_cmdline.hpp>
37 #include <src/verbose_log.hpp>
38 
39 namespace err = jellyfish::err;
40 
41 using jellyfish::mer_dna;
42 typedef std::vector<const char*> file_vector;
43 typedef jellyfish::stream_manager<file_vector::const_iterator> stream_manager;
44 typedef jellyfish::whole_sequence_parser<stream_manager> read_parser;
45 
46 
47 typedef uint64_t hkey_t;
48 typedef uint64_t hval_t;
49 
50 static args_t args;
51 
52 // Poisson term. Computes a term of the Poisson distribution: e^{-\lambda} \lambda^i / i!.
poisson_term(double lambda,unsigned int i)53 double poisson_term(double lambda, unsigned int i) {
54   static const double facts[11] = { 1, 1, 2, 6, 24, 120, 720, 5040, 40320, 362880, 3628800 };
55   static const double tau       = 6.283185307179583;
56 
57   if(i < 11)
58     return exp(-lambda) * pow(lambda, i) / facts[i];
59   else
60     return exp(-lambda + i) * pow(lambda / i, i) / sqrt(tau * i);
61 }
62 
63 // Contaminant database. If a Jellyfish database is given, return true
64 // iff the k-mer is in the database. With no database, it always
65 // returns false.
66 class contaminant_check {
67 public:
~contaminant_check()68   virtual ~contaminant_check() { }
69 
70   virtual bool is_contaminant(const mer_dna& m) = 0;
71   virtual bool is_contaminant(const mer_dna& m, mer_dna& tmp) = 0;
72   virtual void debug(const char*msg) = 0;
73 };
74 
75 class contaminant_no_database : public contaminant_check {
76 public:
~contaminant_no_database()77   virtual ~contaminant_no_database() { }
is_contaminant(const mer_dna & m)78   virtual bool is_contaminant(const mer_dna& m) { return false; }
is_contaminant(const mer_dna & m,mer_dna & tmp)79   virtual bool is_contaminant(const mer_dna& m, mer_dna& tmp) { return false; }
debug(const char * msg)80   virtual void debug(const char* msg) { std::cerr << msg << " no database" << std::endl; }
81 };
82 
83 class contaminant_database : public contaminant_check {
84   mer_array ary_;
85 public:
contaminant_database(binary_reader & reader,size_t size)86   contaminant_database(binary_reader& reader, size_t size) : ary_(size, mer_dna::k() * 2, 0, 126) {
87     while(reader.next()) {
88       if(!ary_.set(reader.key()))
89         throw std::runtime_error("Size of hash for contaminant too small");
90     }
91   }
~contaminant_database()92   virtual ~contaminant_database() { }
debug(const char * msg)93   virtual void debug(const char* msg) { std::cerr << msg << std::endl; }
is_contaminant(const mer_dna & m)94   virtual bool is_contaminant(const mer_dna& m) { return ary_.has_key(m); }
is_contaminant(const mer_dna & m,mer_dna & tmp)95   virtual bool is_contaminant(const mer_dna& m, mer_dna& tmp) {
96     size_t id;
97     return ary_.get_key_id(m, &id, tmp);
98   }
99 };
100 
101 template<class instance_t>
102 class error_correct_t : public jellyfish::thread_exec {
103   read_parser            _parser;
104   int                    _skip;
105   int                    _good;
106   int                    _anchor;
107   std::string            _prefix;
108   int                    _min_count;
109   int			 _cutoff;
110   char                   _qual_cutoff;
111   int                    _window;
112   int                    _error;
113   bool                   _gzip;
114   const database_query*  _mer_database;
115   contaminant_check*     _contaminant;
116   bool                   _trim_contaminant;
117   int                    _homo_trim;
118   double                 _collision_prob; // collision probability = a priori error rate / 3
119   double                 _poisson_threshold;
120   bool                   _no_discard;
121 
122   jflib::o_multiplexer * _output;
123   jflib::o_multiplexer * _log;
124 public:
error_correct_t(int nb_threads,stream_manager & streams)125   error_correct_t(int nb_threads, stream_manager& streams) :
126     _parser(4 * nb_threads, 100, 1, streams),
127     _skip(0), _good(1), _min_count(1), _cutoff(4), _window(0), _error(0), _gzip(false),
128     _mer_database(0), _contaminant(0), _trim_contaminant(false),
129     _homo_trim(std::numeric_limits<int>::min()), _no_discard(false) { }
130 
131 private:
132   // Open the data (error corrected reads) and log files. Default to
133   // STDOUT and STDERR if none specified.
open_file(const std::string prefix,const char * suffix,const std::string def)134   std::ostream* open_file(const std::string prefix, const char* suffix,
135                           const std::string def) {
136     std::ostream* res;
137     std::string file;
138 
139     if(prefix.empty())
140       file = def;
141     else {
142       file = prefix;
143       file += suffix;
144     }
145     if(_gzip) {
146       if(!prefix.empty())
147         file += ".gz";
148       res = new gzipstream(file.c_str());
149     } else
150       res = new std::ofstream(file.c_str());
151     if(!res->good())
152       throw std::runtime_error(err::msg() << "Failed to open file '" << file << "'" << err::no);
153     res->exceptions(std::ios::eofbit|std::ios::failbit|std::ios::badbit);
154     return res;
155   }
156 
157 public:
do_it(int nb_threads)158   void do_it(int nb_threads) {
159     // Make sure they are deleted when done
160     std::unique_ptr<std::ostream> details(open_file(_prefix, ".log", "/dev/fd/2"));
161     std::unique_ptr<std::ostream> output(open_file(_prefix, ".fa", "/dev/fd/1"));
162     // Multiplexers, same thing
163     std::unique_ptr<jflib::o_multiplexer>
164       log_m(new jflib::o_multiplexer(details.get(), 3 * nb_threads, 1024));
165     std::unique_ptr<jflib::o_multiplexer>
166       output_m(new jflib::o_multiplexer(output.get(), 3 * nb_threads, 1024));
167     _log    = log_m.get();
168     _output = output_m.get();
169 
170     exec_join(nb_threads);
171   }
172 
start(int id)173   virtual void start(int id) {
174     instance_t(*this, id).start();
175   }
176 
skip(int s)177   error_correct_t& skip(int s) { _skip = s; return *this; }
good(int g)178   error_correct_t& good(int g) { _good = g; return *this; }
anchor(int a)179   error_correct_t& anchor(int a) { _anchor = a; return *this; }
prefix(const char * s)180   error_correct_t& prefix(const char *s) { _prefix = s; return *this; }
prefix(const std::string s)181   error_correct_t& prefix(const std::string s) { _prefix = s; return *this; }
min_count(int m)182   error_correct_t& min_count(int m) { _min_count = m; return *this; }
cutoff(int p)183   error_correct_t& cutoff(int p) { _cutoff = p; return *this; }
qual_cutoff(char c)184   error_correct_t& qual_cutoff(char c) { _qual_cutoff = c; return *this; }
185   //  error_corret_t & advance(int a) { _advance = a; return *this; }
window(int w)186   error_correct_t& window(int w) { _window = w; return *this; }
error(int e)187   error_correct_t& error(int e) { _error = e; return *this; }
gzip(bool g)188   error_correct_t& gzip(bool g) { _gzip = g; return *this; }
mer_database(database_query * q)189   error_correct_t& mer_database(database_query* q) { _mer_database = q; return *this; }
contaminant(contaminant_check * c)190   error_correct_t& contaminant(contaminant_check* c) { _contaminant = c; return *this; }
trim_contaminant(bool t)191   error_correct_t& trim_contaminant(bool t) { _trim_contaminant = t; return *this; }
homo_trim(int t)192   error_correct_t& homo_trim(int t) { _homo_trim = t; return *this; }
collision_prob(double cp)193   error_correct_t& collision_prob(double cp) { _collision_prob = cp; return *this; }
poisson_threshold(double t)194   error_correct_t& poisson_threshold(double t) { _poisson_threshold = t; return *this; }
no_discard(bool d)195   error_correct_t& no_discard(bool d) { _no_discard = d; return *this; }
196 
parser()197   read_parser& parser() { return _parser; }
skip() const198   int skip() const { return _skip; }
good() const199   int good() const { return _good; }
anchor() const200   int anchor() const { return _anchor; }
prefix() const201   const std::string & prefix() const { return _prefix; }
min_count() const202   int min_count() const { return _min_count; }
cutoff() const203   int cutoff() const { return _cutoff; }
qual_cutoff() const204   char qual_cutoff() const { return _qual_cutoff; }
205   //  int advance() const { return _advance; }
window() const206   int window() const { return _window ? _window : mer_dna::k(); }
error() const207   int error() const { return _error ? _error : mer_dna::k() / 2; }
gzip() const208   bool gzip() const { return _gzip; }
mer_database() const209   const database_query* mer_database() const { return _mer_database; }
contaminant() const210   contaminant_check* contaminant() const { return _contaminant; }
trim_contaminant() const211   bool trim_contaminant() const { return _trim_contaminant; }
do_homo_trim() const212   bool do_homo_trim() const { return _homo_trim != std::numeric_limits<int>::min(); }
homo_trim() const213   int homo_trim() const { return _homo_trim; }
collision_prob() const214   double collision_prob() const { return _collision_prob; }
poisson_threshold() const215   double poisson_threshold() const { return _poisson_threshold; }
no_discard() const216   bool no_discard() const { return _no_discard; }
217 
output()218   jflib::o_multiplexer& output() { return *_output; }
log()219   jflib::o_multiplexer& log() { return *_log; }
220 };
221 
222 class error_correct_instance {
223 public:
224   typedef error_correct_t<error_correct_instance> ec_t ;
225 
226 private:
227   ec_t&   _ec;
228   //  int     _id;
229   size_t  _buff_size;
230   char*   _buffer;
231   kmer_t  _tmp_mer;
232   mer_dna _tmp_mer_dna;
233 
234   static const char* error_contaminant;
235   static const char* error_no_starting_mer;
236   static const char* error_homopolymer;
237 
238 public:
error_correct_instance(ec_t & ec,int id)239   error_correct_instance(ec_t& ec, int id) :
240     _ec(ec), _buff_size(0), _buffer(0) { }
241     //    _ec(ec), _id(id), _buff_size(0), _buffer(0) { }
~error_correct_instance()242   ~error_correct_instance() {
243     free(_buffer);
244   }
245 
start()246   void start() {
247     jflib::omstream output(_ec.output());
248     jflib::omstream details(_ec.log());
249     kmer_t          mer, tmer;
250 
251     uint64_t nb_reads = 0;
252     while(true) {
253       read_parser::job job(_ec.parser());
254       if(job.is_empty()) break;
255       for(size_t i = 0; i < job->nb_filled; ++i) {
256         if(i % 2 == 0)
257           output << jflib::endr;
258         const std::string& header   = job->data[i].header;
259         const std::string& sequence = job->data[i].seq;
260         const char* const  seq_s    = sequence.c_str();
261         const char* const  seq_e    = seq_s + sequence.size();
262         const char* const  qual_s   = job->data[i].qual.c_str();
263 
264         nb_reads++;
265         insure_length_buffer(sequence.size());
266 
267         const char* error = "";
268         const char *input = seq_s + _ec.skip();
269         char       *out   = _buffer + _ec.skip();
270         //Prime system. Find and write starting k-mer
271         if(!find_starting_mer(mer, input, seq_e, out, &error)) {
272           details << "Skipped " << header << ": " << error << "\n";
273           details << jflib::endr;
274           if(_ec.no_discard())
275             output << ">" << header << "\nN\n";
276           continue;
277         }
278         // Extend forward and backward
279         tmer = mer;
280         const ssize_t start_off = input - seq_s;
281         forward_log fwd_log(_ec.window(), _ec.error());
282         char *end_out =
283           extend(forward_mer(tmer),
284                  forward_ptr<const char>(input),
285                  forward_ptr<const char>(qual_s + start_off),
286                  forward_counter(start_off),
287                  forward_ptr<const char>(seq_e),
288                  forward_ptr<char>(out), fwd_log,
289                  &error);
290         if(!end_out) {
291           details << "Skipped " << header << ": " << error << "\n";
292           details << jflib::endr;
293           if(_ec.no_discard())
294             output << ">" << header << "\nN\n";
295           continue;
296         }
297         assert(input > seq_s + mer_dna::k());
298         assert(out > _buffer + mer_dna::k());
299         assert(input - seq_s == out - _buffer);
300         tmer = mer;
301         backward_log bwd_log(_ec.window(), _ec.error());
302         char *start_out =
303           extend(backward_mer(tmer),
304                  backward_ptr<const char>(input - mer_dna::k() - 1),
305                  backward_ptr<const char>(qual_s + start_off - mer_dna::k() - 1),
306                  backward_counter(start_off - mer_dna::k() - 1),
307                  backward_ptr<const char>(seq_s - 1),
308                  backward_ptr<char>(out - mer_dna::k() - 1), bwd_log,
309                  &error);
310         if(!start_out) {
311           details << "Skipped " << header << ": " << error << "\n";
312           details << jflib::endr;
313           if(_ec.no_discard())
314             output << ">" << header << "\nN\n";
315           continue;
316         }
317         start_out++;
318         assert(start_out >= _buffer);
319         assert(_buffer + _buff_size >= end_out);
320 
321         if(_ec.do_homo_trim()) {
322           end_out = homo_trim(_buffer, start_out, end_out, fwd_log, bwd_log, &error);
323           if(!end_out) {
324             details << "Skipped " << header << ": " << error << "\n";
325             details << jflib::endr;
326             if(_ec.no_discard())
327               output << ">" << header << "\nN\n";
328             continue;
329           }
330         }
331         assert(end_out >= _buffer);
332         assert(_buffer + _buff_size >= end_out);
333 
334         output << ">" << header
335                << " " << fwd_log << " " << bwd_log << "\n"
336                << substr(start_out, end_out) << "\n";
337       } // for(size_t i...  Loop over reads in job
338     } // while(true)... loop over all jobs
339     details.close();
340     output.close();
341   }
342 
343 private:
344   enum log_code { OK, TRUNCATE, ERROR };
345 
346   template<typename dir_mer, typename elog, typename counter>
check_contaminant(dir_mer & mer,elog & log,const counter & cpos,const char ** error)347   log_code check_contaminant(dir_mer& mer, elog& log, const counter& cpos, const char**error) {
348     if(_ec.contaminant()->is_contaminant(mer.canonical(), _tmp_mer_dna)) {
349       if(_ec.trim_contaminant()) {
350         log.truncation(cpos);
351         return TRUNCATE;
352       }
353       *error = error_contaminant;
354       return ERROR;
355     }
356     return OK;
357   }
358 
359   // Log a substitution.
360   template<typename dir_mer, typename out_dir_ptr, typename elog, typename counter>
log_substitution(dir_mer & mer,out_dir_ptr & out,elog & log,const counter & cpos,int from,int to,const char ** error)361   log_code log_substitution(dir_mer& mer, out_dir_ptr& out, elog& log, const counter& cpos, int from, int to,
362                             const char**error) {
363     if(from == to)
364       return OK;
365 
366     mer.replace(0, to);
367     switch(log_code c = check_contaminant(mer, log, cpos, error)) {
368     case OK: break;
369     default: return c;
370     }
371 
372     if(log.substitution(cpos, from >= 0 ? mer_dna::rev_code(from) : 'N', to >= 0 ? mer_dna::rev_code(to) : 'N')) {
373       int diff = log.remove_last_window();
374       out = out - diff;
375       log.truncation(cpos - diff);
376       return TRUNCATE;
377     }
378     return OK;
379   }
380 
381   // Extend and correct read. Copy from input to out. mer should be
382   // represent a "good" starting k-mer at the input position.
383   // out point to the next character to be written.
384   template<typename dir_mer, typename in_dir_ptr, typename out_dir_ptr,
385            typename counter, typename elog>
extend(dir_mer mer,in_dir_ptr input,in_dir_ptr qual,counter pos,in_dir_ptr end,out_dir_ptr out,elog & log,const char ** error)386   char * extend(dir_mer mer, in_dir_ptr input, in_dir_ptr qual,
387                 counter pos, in_dir_ptr end,
388                 out_dir_ptr out, elog &log, const char** error) {
389     counter  cpos       = pos;
390     uint32_t prev_count = _ec.mer_database()->get_val(mer.canonical());
391 
392     for( ; input < end; ++input, ++qual) {
393       const char base = *input;
394 
395       if(base == '\n')
396         continue;
397       cpos = pos;
398       ++pos;
399 
400       const int ori_code = mer_dna::code(base);
401       mer.shift(ori_code >= 0 ? ori_code : 0);
402       if(ori_code >= 0) {
403         switch(check_contaminant(mer, log, cpos, error)) {
404         case OK: break;
405         case TRUNCATE: goto done;
406         case ERROR: return 0;
407         }
408       }
409       uint64_t counts[4];
410       int      ucode = 0;
411       int      level;
412 
413       const int count = _ec.mer_database()->get_best_alternatives(mer, counts, ucode, level);
414 
415       // No coninuation whatsoever, trim.
416       if(count == 0) {
417         log.truncation(cpos);
418         goto done;
419       }
420 
421       if(count == 1) { // One continuation. Is it an error?
422         prev_count = counts[ucode];
423         switch(log_substitution(mer, out, log, cpos, ori_code, ucode, error)) {
424         case OK: break;
425         case TRUNCATE: goto done;
426         case ERROR: return 0;
427         }
428         *out++ = mer.base(0);
429         continue;
430       }
431       // We get here if there is more than one alternative base to try
432       // at some level. Note that if the current base is low quality
433       // and all alternatives are higher quality, then the current
434       // base will have a count of zero. If the current base is non N
435       // and has a high count or previous count is low enough that
436       // continuity does not apply, output current base. But if the current
437       // base has count of zero, all alternatives are low quality and prev_count is low
438       // then trim
439       if(ori_code >= 0){ //if the current base is valid base (non N)
440 	if(counts[ori_code] > (uint64_t)_ec.min_count()) {
441           if(counts[ori_code] >= (uint32_t)_ec.cutoff() || *qual >= _ec.qual_cutoff()) {
442             *out++ = mer.base(0);
443             continue;
444           }
445           // Now we ask for a probability of getting
446           // counts[ori_code] errors with p=1/300 in sum_counts
447           // trials.  If this probability is < 10e-6, do not correct
448           double p = (double)(counts[0] + counts[1] + counts[2] + counts[3]) * _ec.collision_prob();
449           const double prob = poisson_term(p, counts[ori_code]);
450           if(prob < _ec.poisson_threshold()) {
451             *out++ = mer.base(0);
452             continue;
453           }
454 	} else if(level == 0 && counts[ori_code] == 0) {
455           // definitely an error and all alternatives are low quality
456           log.truncation(cpos);
457           goto done;
458 	}
459       } else if(level == 0) { //if all alternatives are low quality
460 	log.truncation(cpos);
461 	goto done;
462       }
463 
464       // We get here if there are multiple possible substitutions, the
465       // original count is low enough and the previous count is high (good) or
466       // the current base is an N
467       // We find out all alternative bases
468       // that have a continuation at the same or better level.  Then
469       // if the current base is N, pick the one with the highest
470       // count that is the most similar to the prev_count,
471       // otherwise pick the one with the most similar count.
472       // If no alternative continues, leave the base alone.
473       int          check_code               = ori_code;
474       bool         success                  = false;
475       uint64_t     cont_counts[4]; //here we record the counts for the continuations
476       bool         continue_with_correct_base[4];
477       int          read_nbase_code          = -1;
478       bool         candidate_continuations[4];
479       unsigned int ncandidate_continuations = 0;
480 
481       //here we determine what the next base in the read is
482       if(input + 1 < end)
483         read_nbase_code = mer_dna::code(*(input + 1));
484 
485       for(int i = 0; i < 4; ++i) {
486         cont_counts[i]                = 0;
487         continue_with_correct_base[i] = false;
488         if(counts[i] <= (uint64_t)_ec.min_count())
489           continue;
490         check_code = i;
491         // Check that it continues at least one more base with that quality
492         _tmp_mer     = mer.kmer();
493         dir_mer nmer = _tmp_mer;
494         nmer.replace(0, check_code);
495         // Does not matter what we shift, check all alternative anyway.
496         nmer.shift(0);
497 
498         uint64_t   ncounts[4];
499         int        nucode = 0;
500         int        nlevel;
501         const int ncount = _ec.mer_database()->get_best_alternatives(nmer, ncounts, nucode, nlevel);
502         if(ncount > 0 && nlevel >= level) {
503           continue_with_correct_base[i] = read_nbase_code >= 0 && ncounts[read_nbase_code] > 0;
504           success                       = true;
505           cont_counts[i]                = counts[i];
506         }
507       }
508 
509       if(success) {
510         // We found at least one alternative base that continues now
511         // we look for all alernatives that have a count closest to
512         // the previous count first we determine the count that is the
513         // closest to the current count but in the special case of
514         // prev_count == 1 we simply pick the largest count
515         check_code           = -1;
516         uint32_t _prev_count = prev_count<=(uint64_t)_ec.min_count() ? std::numeric_limits<uint32_t>::max() : prev_count;
517         int      min_diff    = std::numeric_limits<int>::max();
518         for(int  i = 0; i < 4; ++i) {
519           candidate_continuations[i] = false;
520           if(cont_counts[i] > 0)
521             min_diff = std::min(min_diff, (int)std::abs((long)cont_counts[i] - (long)_prev_count));
522         }
523 
524         //we now know the count that is the closest, now we determine how many alternatives have this count
525         for(uint32_t  i = 0; i < 4; i++) {
526           if(std::abs((long)cont_counts[i] - (long)_prev_count) == min_diff){
527             candidate_continuations[i] = true;
528             ++ncandidate_continuations;
529             check_code=i;
530           }
531         }
532 
533         //do we have more than one good candidate? if we do then check which one continues with the correct base
534         if(ncandidate_continuations>1 && read_nbase_code >= 0)
535           for(int  i = 0; i < 4; ++i){
536             if(candidate_continuations[i]){
537               if(!continue_with_correct_base[i])
538                 --ncandidate_continuations;
539               else
540                 check_code = i;
541             }
542           }
543 
544         //fail if we still have more than one candidate
545         if(ncandidate_continuations != 1)
546           check_code = -1;
547 
548         if(check_code >= 0) {
549           switch(log_substitution(mer, out, log, cpos, ori_code, check_code, error)) {
550           case OK: break;
551           case TRUNCATE: goto done;
552           case ERROR: return 0;
553           }
554         }
555       }
556       if(ori_code < 0 && check_code < 0) {// if invalid base and no good sub found
557         log.truncation(cpos);
558         goto done;
559       }
560       *out++ = mer.base(0);
561     }
562 
563   done:
564     return out.ptr();
565   }
566 
homo_trim(const char * start,char * out_start,char * out_end,forward_log & fwd_log,backward_log & bwd_log,const char ** error)567   char* homo_trim(const char* start, char* out_start, char* out_end,
568 		  forward_log& fwd_log, backward_log& bwd_log, const char** error) {
569     int   max_homo_score = std::numeric_limits<int>::min();
570     char* max_pos        = 0;
571     int   homo_score     = 0;
572     char* ptr            = out_end - 1;
573     char  pbase          = mer_dna::code(*ptr);
574 
575     for(--ptr; ptr >= out_start; --ptr) {
576       char cbase = mer_dna::code(*ptr);
577       homo_score += ((pbase == cbase) << 1) - 1; // Add 1 if same as last, -1 if not
578       pbase       = cbase;
579       if(homo_score > max_homo_score) {
580 	max_homo_score = homo_score;
581 	max_pos        = ptr;
582       }
583     }
584 
585     if(max_homo_score < _ec.homo_trim())
586       return out_end; // Not a high score -> return without truncation
587     // assert(max_pos >= out_start);
588     // assert(max_pos >= start);
589     if(max_pos < out_start) {
590       *error = error_homopolymer;
591       return 0;
592     }
593     fwd_log.force_truncate(forward_counter(max_pos - start));
594     bwd_log.force_truncate(backward_counter(max_pos - start));
595     fwd_log.truncation(forward_counter(max_pos - start));
596     return max_pos;
597   }
598 
insure_length_buffer(size_t len)599   void insure_length_buffer(size_t len) {
600     if(len <= _buff_size)
601       return;
602 
603     _buff_size = len > 2 * _buff_size ? len + 100 : 2 * _buff_size;
604     _buffer    = (char *)realloc(_buffer, _buff_size);
605     if(!_buffer)
606       throw std::runtime_error(err::msg() << "Buffer allocation failed, size " << _buffer << err::no);
607   }
608 
find_starting_mer(kmer_t & mer,const char * & input,const char * end,char * & out,const char ** error)609   bool find_starting_mer(kmer_t &mer, const char * &input, const char *end, char * &out,
610 			 const char** error) {
611     while(input < end) {
612       for(int i = 0; input < end && i < (int)mer_dna::k(); ++i) {
613 	char base = *input++;
614 	*out++ = base;
615 	if(!mer.shift_left(base))
616 	  i = -1;        // If an N, skip to next k-mer
617       }
618       int found = 0;
619       while(input < end) {
620 	bool contaminated = _ec.contaminant()->is_contaminant(mer.canonical(), _tmp_mer_dna);
621 	if(contaminated && !_ec.trim_contaminant()) {
622 	  *error = error_contaminant;
623 	  return false;
624 	}
625 
626 	if(!contaminated) {
627 	  hval_t val = _ec.mer_database()->get_val(mer.canonical());
628 
629 	  found = (int)val >= _ec.anchor() ? found + 1 : 0;
630 	  if(found >= _ec.good())
631 	    return true;
632 	}
633 
634 	char base = *input++;
635 	*out++ = base;
636 	if(!mer.shift_left(base))
637 	  break;
638       }
639     }
640 
641     *error = error_no_starting_mer;
642     return false;
643   }
644 };
645 
646 const char* error_correct_instance::error_contaminant     = "Contaminated read";
647 const char* error_correct_instance::error_no_starting_mer = "No high quality mer";
648 const char* error_correct_instance::error_homopolymer     = "Entire read is an homopolymer";
649 
compute_poisson_cutoff__(const val_array_raw & counts,double collision_prob,double poisson_threshold)650 unsigned int compute_poisson_cutoff__(const val_array_raw& counts, double collision_prob, double poisson_threshold) {
651   auto     it_end   = counts.end();
652   uint64_t distinct = 0;
653   uint64_t total    = 0;
654   for(auto it = counts.begin(); it != it_end; ++it) {
655     if((*it & 0x1) && (*it >= 2)) {
656       distinct += 1;
657       total    += *it >> 1;
658     }
659   }
660   const double coverage = (double)total / (double)distinct;
661   vlog << "distinct mers:" << distinct << " total mers:" << total << " estimated coverage:" << coverage;
662   const double lambda = coverage * collision_prob;
663   vlog << "lambda:" << lambda << " collision_prob:" << collision_prob << " poisson_threshold:" << poisson_threshold;
664   for(unsigned int x = 2; x < 1000; ++x)
665     if(poisson_term(lambda, x) < poisson_threshold)
666       return x + 1;
667   return 0;
668 }
669 
compute_poisson_cutoff(const val_array_raw & counts,double collision_prob,double poisson_threshold)670 unsigned int compute_poisson_cutoff(const val_array_raw& counts, double collision_prob, double poisson_threshold) {
671   vlog << "Computing Poisson cutoff";
672   unsigned int res = compute_poisson_cutoff__(counts, collision_prob, poisson_threshold);
673   return res;
674 }
675 
main(int argc,char * argv[])676 int main(int argc, char *argv[])
677 {
678   args.parse(argc, argv);
679 
680   if(args.qual_cutoff_char_given && args.qual_cutoff_char_arg.size() != 1)
681     args_t::error("The qual-cutoff-char must be one ASCII character.");
682   if(args.qual_cutoff_value_given && args.qual_cutoff_value_arg > (uint32_t)std::numeric_limits<char>::max())
683     args_t::error("The qual-cutoff-value must be in the range 0-127.");
684   const char qual_cutoff = args.qual_cutoff_char_given ? args.qual_cutoff_char_arg[0] :
685     (args.qual_cutoff_value_given ? (char)args.qual_cutoff_value_arg : std::numeric_limits<char>::max());
686 
687   verbose_log::verbose = args.verbose_flag;
688   vlog << "Loading mer database";
689   database_query mer_database(args.db_arg, args.no_mmap_flag);
690   mer_dna::k(mer_database.header().key_len() / 2);
691 
692   // Open contaminant database.
693   std::unique_ptr<contaminant_check> contaminant;
694   contaminant.reset(new contaminant_no_database());
695   if(args.contaminant_given) {
696     vlog << "Loading contaminant sequences";
697     std::ifstream contaminant_file(args.contaminant_arg);
698     if(!contaminant_file.good())
699       err::die(err::msg() << "Failed to open contaminant file '" << args.contaminant_arg << "'");
700     jellyfish::file_header header(contaminant_file);
701     if(header.format() != binary_dumper::format)
702       err::die(err::msg() << "Contaminant format expected '" << binary_dumper::format << "'");
703     if(mer_dna::k() * 2 != header.key_len())
704       err::die(err::msg() << "Contaminant mer length (" << (header.key_len() / 2)
705           << ") different than correction mer length (" << mer_dna::k() << ")");
706     binary_reader reader(contaminant_file, &header);
707     contaminant.reset(new contaminant_database(reader, header.size()));
708   }
709 
710   stream_manager streams(args.sequence_arg.cbegin(), args.sequence_arg.cend(), 1);
711 
712   const unsigned int cutoff =   args.cutoff_given ?
713     args.cutoff_arg :
714     compute_poisson_cutoff(mer_database.vals(), args.apriori_error_rate_arg / 3,
715                            args.poisson_threshold_arg / args.apriori_error_rate_arg);
716   vlog << "Using cutoff of " << cutoff;
717   if(cutoff == 0 && !args.cutoff_given)
718     err::die("Cutoff computation failed. Pass it explicitly with -p switch.");
719 
720   error_correct_instance::ec_t correct(args.thread_arg, streams);
721   correct.skip(args.skip_arg).good(args.good_arg)
722     .anchor(args.anchor_count_arg)
723     .prefix(args.output_given ? (std::string)args.output_arg : "")
724     .min_count(args.min_count_arg)
725     .cutoff(cutoff)
726     .qual_cutoff(qual_cutoff)
727     .window(args.window_arg)
728     .error(args.error_arg)
729     .gzip(args.gzip_flag)
730     .mer_database(&mer_database)
731     .contaminant(contaminant.get())
732     .trim_contaminant(args.trim_contaminant_flag)
733     .homo_trim(args.homo_trim_given ? args.homo_trim_arg : std::numeric_limits<int>::min())
734     .collision_prob(args.apriori_error_rate_arg / 3)
735     .poisson_threshold(args.poisson_threshold_arg)
736     .no_discard(args.no_discard_flag);
737   vlog << "Correcting reads";
738   correct.do_it(args.thread_arg);
739   vlog << "Done";
740 
741   return 0;
742  }
743 
744