1 /* This file is part of Jellyfish. 2 3 Jellyfish is free software: you can redistribute it and/or modify 4 it under the terms of the GNU General Public License as published by 5 the Free Software Foundation, either version 3 of the License, or 6 (at your option) any later version. 7 8 Jellyfish is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with Jellyfish. If not, see <http://www.gnu.org/licenses/>. 15 */ 16 17 #ifndef __JELLYFISH_MER_OVELAP_SEQUENCE_PARSER_H_ 18 #define __JELLYFISH_MER_OVELAP_SEQUENCE_PARSER_H_ 19 20 #include <stdint.h> 21 22 #include <memory> 23 24 #include <jellyfish/err.hpp> 25 #include <jellyfish/cooperative_pool2.hpp> 26 #include <jellyfish/cpp_array.hpp> 27 28 namespace jellyfish { 29 30 struct sequence_ptr { 31 char* start; 32 char* end; 33 }; 34 35 template<typename StreamIterator> 36 class mer_overlap_sequence_parser : public jellyfish::cooperative_pool2<mer_overlap_sequence_parser<StreamIterator>, sequence_ptr> { 37 typedef jellyfish::cooperative_pool2<mer_overlap_sequence_parser<StreamIterator>, sequence_ptr> super; 38 enum file_type { DONE_TYPE, FASTA_TYPE, FASTQ_TYPE }; 39 typedef std::unique_ptr<std::istream> stream_type; 40 41 struct stream_status { 42 char* seam; 43 size_t seq_len; 44 bool have_seam; 45 file_type type; 46 stream_type stream; 47 stream_statusjellyfish::mer_overlap_sequence_parser::stream_status48 stream_status() : seam(0), seq_len(0), have_seam(false), type(DONE_TYPE) { } 49 }; 50 51 uint16_t mer_len_; 52 size_t buf_size_; 53 char* buffer; 54 char* seam_buffer; 55 locks::pthread::mutex streams_mutex; 56 char* data; 57 cpp_array<stream_status> streams_; 58 StreamIterator& streams_iterator_; 59 size_t files_read_; // nb of files read 60 size_t reads_read_; // nb of reads read 61 62 public: 63 /// Max_producers is the maximum number of concurrent threads than 64 /// can produce data simultaneously. Size is the number of buffer to 65 /// keep around. It should be larger than the number of thread 66 /// expected to read from this class. buf_size is the size of each 67 /// buffer. A StreamIterator is expected to have a next() method, 68 /// which is thread safe, and which returns (move) a 69 /// std::unique<std::istream> object. mer_overlap_sequence_parser(uint16_t mer_len,uint32_t max_producers,uint32_t size,size_t buf_size,StreamIterator & streams)70 mer_overlap_sequence_parser(uint16_t mer_len, uint32_t max_producers, uint32_t size, size_t buf_size, 71 StreamIterator& streams) : 72 super(max_producers, size), 73 mer_len_(mer_len), 74 buf_size_(buf_size), 75 buffer(new char[size * buf_size]), 76 seam_buffer(new char[max_producers * (mer_len - 1)]), 77 streams_(max_producers), 78 streams_iterator_(streams), 79 files_read_(0), reads_read_(0) 80 { 81 for(sequence_ptr* it = super::element_begin(); it != super::element_end(); ++it) 82 it->start = it->end = buffer + (it - super::element_begin()) * buf_size; 83 for(uint32_t i = 0; i < max_producers; ++i) { 84 streams_.init(i); 85 streams_[i].seam = seam_buffer + i * (mer_len - 1); 86 open_next_file(streams_[i]); 87 } 88 } 89 ~mer_overlap_sequence_parser()90 ~mer_overlap_sequence_parser() { 91 delete [] buffer; 92 delete [] seam_buffer; 93 } 94 95 // file_type get_type() const { return type; } 96 produce(uint32_t i,sequence_ptr & buff)97 inline bool produce(uint32_t i, sequence_ptr& buff) { 98 stream_status& st = streams_[i]; 99 100 switch(st.type) { 101 case FASTA_TYPE: 102 read_fasta(st, buff); 103 break; 104 case FASTQ_TYPE: 105 read_fastq(st, buff); 106 break; 107 case DONE_TYPE: 108 return true; 109 } 110 111 if(st.stream->good()) 112 return false; 113 114 // Reach the end of file, close current and try to open the next one 115 st.have_seam = false; 116 open_next_file(st); 117 return false; 118 } 119 nb_files() const120 size_t nb_files() const { return files_read_; } nb_reads() const121 size_t nb_reads() const { return reads_read_; } 122 123 protected: open_next_file(stream_status & st)124 bool open_next_file(stream_status& st) { 125 // The stream must be released, with .reset(), before calling 126 // .next() on the streams_iterator_, to ensure that the 127 // streams_iterator_ noticed that we closed that stream before 128 // requesting a new one. 129 st.stream.reset(); 130 st.stream = streams_iterator_.next(); 131 if(!st.stream) { 132 st.type = DONE_TYPE; 133 return false; 134 } 135 136 ++files_read_; 137 switch(st.stream->peek()) { 138 case EOF: return open_next_file(st); 139 case '>': 140 st.type = FASTA_TYPE; 141 ignore_line(*st.stream); // Pass header 142 ++reads_read_; 143 break; 144 case '@': 145 st.type = FASTQ_TYPE; 146 ignore_line(*st.stream); // Pass header 147 ++reads_read_; 148 break; 149 default: 150 throw std::runtime_error("Unsupported format"); // Better error management 151 } 152 return true; 153 } 154 read_fasta(stream_status & st,sequence_ptr & buff)155 void read_fasta(stream_status& st, sequence_ptr& buff) { 156 size_t read = 0; 157 if(st.have_seam) { 158 memcpy(buff.start, st.seam, mer_len_ - 1); 159 read = mer_len_ - 1; 160 } 161 162 // Here, the current stream is assumed to always point to some 163 // sequence (or EOF). Never at header. 164 while(st.stream->good() && read < buf_size_ - mer_len_ - 1) { 165 read += read_sequence(*st.stream, read, buff.start, '>'); 166 if(st.stream->peek() == '>') { 167 *(buff.start + read++) = 'N'; // Add N between reads 168 ignore_line(*st.stream); // Skip to next sequence (skip headers, quals, ...) 169 ++reads_read_; 170 } 171 } 172 buff.end = buff.start + read; 173 174 st.have_seam = read >= (size_t)(mer_len_ - 1); 175 if(st.have_seam) 176 memcpy(st.seam, buff.end - mer_len_ + 1, mer_len_ - 1); 177 } 178 read_fastq(stream_status & st,sequence_ptr & buff)179 void read_fastq(stream_status& st, sequence_ptr& buff) { 180 size_t read = 0; 181 if(st.have_seam) { 182 memcpy(buff.start, st.seam, mer_len_ - 1); 183 read = mer_len_ - 1; 184 } 185 186 // Here, the st.stream is assumed to always point to some 187 // sequence (or EOF). Never at header. 188 while(st.stream->good() && read < buf_size_ - mer_len_ - 1) { 189 size_t nread = read_sequence(*st.stream, read, buff.start, '+'); 190 read += nread; 191 st.seq_len += nread; 192 if(st.stream->peek() == '+') { 193 skip_quals(*st.stream, st.seq_len); 194 if(st.stream->good()) { 195 *(buff.start + read++) = 'N'; // Add N between reads 196 ignore_line(*st.stream); // Skip sequence header 197 ++reads_read_; 198 } 199 st.seq_len = 0; 200 } 201 } 202 buff.end = buff.start + read; 203 204 st.have_seam = read >= (size_t)(mer_len_ - 1); 205 if(st.have_seam) 206 memcpy(st.seam, buff.end - mer_len_ + 1, mer_len_ - 1); 207 } 208 read_sequence(std::istream & is,const size_t read,char * const start,const char stop)209 size_t read_sequence(std::istream& is, const size_t read, char* const start, const char stop) { 210 size_t nread = read; 211 212 skip_newlines(is); // Skip new lines -> get below doesn't like them 213 while(is && nread < buf_size_ - 1 && is.peek() != stop) { 214 is.get(start + nread, buf_size_ - nread); 215 nread += is.gcount(); 216 skip_newlines(is); 217 } 218 return nread - read; 219 } 220 ignore_line(std::istream & is)221 inline void ignore_line(std::istream& is) { 222 is.ignore(std::numeric_limits<std::streamsize>::max(), '\n'); 223 } 224 skip_newlines(std::istream & is)225 inline void skip_newlines(std::istream& is) { 226 while(is.peek() == '\n') 227 is.get(); 228 } 229 230 // Skip quals header and qual values (read_len) of them. skip_quals(std::istream & is,size_t read_len)231 void skip_quals(std::istream& is, size_t read_len) { 232 ignore_line(is); 233 size_t quals = 0; 234 235 skip_newlines(is); 236 while(is.good() && quals < read_len) { 237 is.ignore(read_len - quals + 1, '\n'); 238 quals += is.gcount(); 239 if(is) 240 ++read_len; 241 skip_newlines(is); 242 } 243 skip_newlines(is); 244 if(quals == read_len && (is.peek() == '@' || is.peek() == EOF)) 245 return; 246 247 throw std::runtime_error("Invalid fastq sequence"); 248 } 249 peek(std::istream & is)250 char peek(std::istream& is) { return is.peek(); } 251 }; 252 } 253 254 #endif /* __JELLYFISH_MER_OVELAP_SEQUENCE_PARSER_H_ */ 255