1 /*  This file is part of Jellyfish.
2 
3     Jellyfish is free software: you can redistribute it and/or modify
4     it under the terms of the GNU General Public License as published by
5     the Free Software Foundation, either version 3 of the License, or
6     (at your option) any later version.
7 
8     Jellyfish is distributed in the hope that it will be useful,
9     but WITHOUT ANY WARRANTY; without even the implied warranty of
10     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11     GNU General Public License for more details.
12 
13     You should have received a copy of the GNU General Public License
14     along with Jellyfish.  If not, see <http://www.gnu.org/licenses/>.
15 */
16 
17 #ifndef __JELLYFISH_MER_OVELAP_SEQUENCE_PARSER_H_
18 #define __JELLYFISH_MER_OVELAP_SEQUENCE_PARSER_H_
19 
20 #include <stdint.h>
21 
22 #include <memory>
23 
24 #include <jellyfish/err.hpp>
25 #include <jellyfish/cooperative_pool2.hpp>
26 #include <jellyfish/cpp_array.hpp>
27 
28 namespace jellyfish {
29 
30 struct sequence_ptr {
31   char* start;
32   char* end;
33 };
34 
35 template<typename StreamIterator>
36 class mer_overlap_sequence_parser : public jellyfish::cooperative_pool2<mer_overlap_sequence_parser<StreamIterator>, sequence_ptr> {
37   typedef jellyfish::cooperative_pool2<mer_overlap_sequence_parser<StreamIterator>, sequence_ptr> super;
38   enum file_type { DONE_TYPE, FASTA_TYPE, FASTQ_TYPE };
39   typedef std::unique_ptr<std::istream> stream_type;
40 
41   struct stream_status {
42     char*       seam;
43     size_t      seq_len;
44     bool        have_seam;
45     file_type   type;
46     stream_type stream;
47 
stream_statusjellyfish::mer_overlap_sequence_parser::stream_status48     stream_status() : seam(0), seq_len(0), have_seam(false), type(DONE_TYPE) { }
49   };
50 
51   uint16_t                 mer_len_;
52   size_t                   buf_size_;
53   char*                    buffer;
54   char*                    seam_buffer;
55   locks::pthread::mutex    streams_mutex;
56   char*                    data;
57   cpp_array<stream_status> streams_;
58   StreamIterator&          streams_iterator_;
59   size_t                   files_read_; // nb of files read
60   size_t                   reads_read_; // nb of reads read
61 
62 public:
63   /// Max_producers is the maximum number of concurrent threads than
64   /// can produce data simultaneously. Size is the number of buffer to
65   /// keep around. It should be larger than the number of thread
66   /// expected to read from this class. buf_size is the size of each
67   /// buffer. A StreamIterator is expected to have a next() method,
68   /// which is thread safe, and which returns (move) a
69   /// std::unique<std::istream> object.
mer_overlap_sequence_parser(uint16_t mer_len,uint32_t max_producers,uint32_t size,size_t buf_size,StreamIterator & streams)70   mer_overlap_sequence_parser(uint16_t mer_len, uint32_t max_producers, uint32_t size, size_t buf_size,
71                               StreamIterator& streams) :
72     super(max_producers, size),
73     mer_len_(mer_len),
74     buf_size_(buf_size),
75     buffer(new char[size * buf_size]),
76     seam_buffer(new char[max_producers * (mer_len - 1)]),
77     streams_(max_producers),
78     streams_iterator_(streams),
79     files_read_(0), reads_read_(0)
80   {
81     for(sequence_ptr* it = super::element_begin(); it != super::element_end(); ++it)
82       it->start = it->end = buffer + (it - super::element_begin()) * buf_size;
83     for(uint32_t i = 0; i < max_producers; ++i) {
84       streams_.init(i);
85       streams_[i].seam = seam_buffer + i * (mer_len - 1);
86       open_next_file(streams_[i]);
87     }
88   }
89 
~mer_overlap_sequence_parser()90   ~mer_overlap_sequence_parser() {
91     delete [] buffer;
92     delete [] seam_buffer;
93   }
94 
95   //  file_type get_type() const { return type; }
96 
produce(uint32_t i,sequence_ptr & buff)97   inline bool produce(uint32_t i, sequence_ptr& buff) {
98     stream_status& st = streams_[i];
99 
100     switch(st.type) {
101     case FASTA_TYPE:
102       read_fasta(st, buff);
103       break;
104     case FASTQ_TYPE:
105       read_fastq(st, buff);
106       break;
107     case DONE_TYPE:
108       return true;
109     }
110 
111     if(st.stream->good())
112       return false;
113 
114     // Reach the end of file, close current and try to open the next one
115     st.have_seam = false;
116     open_next_file(st);
117     return false;
118   }
119 
nb_files() const120   size_t nb_files() const { return files_read_; }
nb_reads() const121   size_t nb_reads() const { return reads_read_; }
122 
123 protected:
open_next_file(stream_status & st)124   bool open_next_file(stream_status& st) {
125     // The stream must be released, with .reset(), before calling
126     // .next() on the streams_iterator_, to ensure that the
127     // streams_iterator_ noticed that we closed that stream before
128     // requesting a new one.
129     st.stream.reset();
130     st.stream = streams_iterator_.next();
131     if(!st.stream) {
132       st.type = DONE_TYPE;
133       return false;
134     }
135 
136     ++files_read_;
137     switch(st.stream->peek()) {
138     case EOF: return open_next_file(st);
139     case '>':
140       st.type = FASTA_TYPE;
141       ignore_line(*st.stream); // Pass header
142       ++reads_read_;
143       break;
144     case '@':
145       st.type = FASTQ_TYPE;
146       ignore_line(*st.stream); // Pass header
147       ++reads_read_;
148       break;
149     default:
150       throw std::runtime_error("Unsupported format"); // Better error management
151     }
152     return true;
153   }
154 
read_fasta(stream_status & st,sequence_ptr & buff)155   void read_fasta(stream_status& st, sequence_ptr& buff) {
156     size_t read = 0;
157     if(st.have_seam) {
158       memcpy(buff.start, st.seam, mer_len_ - 1);
159       read = mer_len_ - 1;
160     }
161 
162     // Here, the current stream is assumed to always point to some
163     // sequence (or EOF). Never at header.
164     while(st.stream->good() && read < buf_size_ - mer_len_ - 1) {
165       read += read_sequence(*st.stream, read, buff.start, '>');
166       if(st.stream->peek() == '>') {
167         *(buff.start + read++) = 'N'; // Add N between reads
168         ignore_line(*st.stream); // Skip to next sequence (skip headers, quals, ...)
169         ++reads_read_;
170       }
171     }
172     buff.end = buff.start + read;
173 
174     st.have_seam = read >= (size_t)(mer_len_ - 1);
175     if(st.have_seam)
176       memcpy(st.seam, buff.end - mer_len_ + 1, mer_len_ - 1);
177   }
178 
read_fastq(stream_status & st,sequence_ptr & buff)179   void read_fastq(stream_status& st, sequence_ptr& buff) {
180     size_t read = 0;
181     if(st.have_seam) {
182       memcpy(buff.start, st.seam, mer_len_ - 1);
183       read = mer_len_ - 1;
184     }
185 
186     // Here, the st.stream is assumed to always point to some
187     // sequence (or EOF). Never at header.
188     while(st.stream->good() && read < buf_size_ - mer_len_ - 1) {
189       size_t nread  = read_sequence(*st.stream, read, buff.start, '+');
190       read         += nread;
191       st.seq_len   += nread;
192       if(st.stream->peek() == '+') {
193         skip_quals(*st.stream, st.seq_len);
194         if(st.stream->good()) {
195           *(buff.start + read++) = 'N'; // Add N between reads
196           ignore_line(*st.stream); // Skip sequence header
197           ++reads_read_;
198         }
199         st.seq_len = 0;
200       }
201     }
202     buff.end = buff.start + read;
203 
204     st.have_seam = read >= (size_t)(mer_len_ - 1);
205     if(st.have_seam)
206       memcpy(st.seam, buff.end - mer_len_ + 1, mer_len_ - 1);
207   }
208 
read_sequence(std::istream & is,const size_t read,char * const start,const char stop)209   size_t read_sequence(std::istream& is, const size_t read, char* const start, const char stop) {
210     size_t nread = read;
211 
212     skip_newlines(is); // Skip new lines -> get below doesn't like them
213     while(is && nread < buf_size_ - 1 && is.peek() != stop) {
214       is.get(start + nread, buf_size_ - nread);
215       nread += is.gcount();
216       skip_newlines(is);
217     }
218     return nread - read;
219   }
220 
ignore_line(std::istream & is)221   inline void ignore_line(std::istream& is) {
222     is.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
223   }
224 
skip_newlines(std::istream & is)225   inline void skip_newlines(std::istream& is) {
226     while(is.peek() == '\n')
227       is.get();
228   }
229 
230   // Skip quals header and qual values (read_len) of them.
skip_quals(std::istream & is,size_t read_len)231   void skip_quals(std::istream& is, size_t read_len) {
232     ignore_line(is);
233     size_t quals = 0;
234 
235     skip_newlines(is);
236     while(is.good() && quals < read_len) {
237       is.ignore(read_len - quals + 1, '\n');
238       quals += is.gcount();
239       if(is)
240         ++read_len;
241       skip_newlines(is);
242     }
243     skip_newlines(is);
244     if(quals == read_len && (is.peek() == '@' || is.peek() == EOF))
245       return;
246 
247     throw std::runtime_error("Invalid fastq sequence");
248   }
249 
peek(std::istream & is)250   char peek(std::istream& is) { return is.peek(); }
251 };
252 }
253 
254 #endif /* __JELLYFISH_MER_OVELAP_SEQUENCE_PARSER_H_ */
255