1 /****
2 DIAMOND protein aligner
3 Copyright (C) 2013-2021 Max Planck Society for the Advancement of Science e.V.
4                         Benjamin Buchfink
5                         Eberhard Karls Universitaet Tuebingen
6 
7 Code developed by Benjamin Buchfink <benjamin.buchfink@tue.mpg.de>
8 
9 This program is free software: you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation, either version 3 of the License, or
12 (at your option) any later version.
13 
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 GNU General Public License for more details.
18 
19 You should have received a copy of the GNU General Public License
20 along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 ****/
22 
23 #include <iostream>
24 #include <thread>
25 #include "sequence_file.h"
26 #include "../masking/masking.h"
27 #include "reference.h"
28 #include "dmnd/dmnd.h"
29 #include "../util/system/system.h"
30 #include "../util/util.h"
31 #include "../util/algo/partition.h"
32 #include "../util/sequence/sequence.h"
33 #include "../util/parallel/multiprocessing.h"
34 #include "../basic/config.h"
35 #ifdef WITH_BLASTDB
36 #include "blastdb/blastdb.h"
37 #endif
38 
39 using std::cout;
40 using std::endl;
41 using std::setw;
42 using std::thread;
43 
44 const EMap<SequenceFile::Type> EnumTraits<SequenceFile::Type>::to_string = { {SequenceFile::Type::DMND, "Diamond database" }, {SequenceFile::Type::BLAST, "BLAST database"} };
45 
dict_file_name(const size_t query_block,const size_t target_block)46 static string dict_file_name(const size_t query_block, const size_t target_block) {
47 	const string file_name = append_label("ref_dict_", query_block) + append_label("_", target_block);
48 	return join_path(config.parallel_tmpdir, file_name);
49 }
50 
single_oid(const SequenceFile * f,const string & acc)51 static size_t single_oid(const SequenceFile* f, const string& acc) {
52 	const vector<int> oid = f->accession_to_oid(acc);
53 	if (oid.empty())
54 		throw AccessionNotFound();
55 	if (oid.size() > 1)
56 		throw std::runtime_error("Multiple oids for target accession: " + acc);
57 	return oid.front();
58 }
59 
load_block(size_t block_id_begin,size_t block_id_end,size_t pos,bool use_filter,const vector<uint64_t> * filtered_pos,bool load_ids,Block * block)60 void SequenceFile::load_block(size_t block_id_begin, size_t block_id_end, size_t pos, bool use_filter, const vector<uint64_t>* filtered_pos, bool load_ids, Block* block) {
61 	static const size_t MAX_LOAD_SIZE = 2 * GIGABYTES;
62 	seek_offset(pos);
63 	size_t load_size = 0;
64 	for (size_t i = block_id_begin; i < block_id_end; ++i) {
65 		bool seek = false;
66 		if (use_filter && (*filtered_pos)[i]) {
67 			pos = (*filtered_pos)[i];
68 			seek = true;
69 		}
70 		const size_t l = block->seqs_.length(i);
71 		load_size += l;
72 		read_seq_data(block->seqs_.ptr(i), l, pos, seek);
73 		if (load_ids)
74 			read_id_data(pos - 1, block->ids_.ptr(i), block->ids_.length(i));
75 		else
76 			skip_id_data();
77 		if (type_ == Type::DMND)
78 			Masking::get().remove_bit_mask(block->seqs_.ptr(i), block->seqs_.length(i));
79 		if (load_size > MAX_LOAD_SIZE) {
80 			close_weakly();
81 			reopen();
82 			load_size = 0;
83 		}
84 	}
85 }
86 
dict_block(const size_t ref_block)87 size_t SequenceFile::dict_block(const size_t ref_block)
88 {
89 	return config.multiprocessing ? ref_block : 0;
90 }
91 
load_seqs(const size_t max_letters,bool load_ids,const BitVector * filter,bool fetch_seqs,bool lazy_masking,const Chunk & chunk)92 Block* SequenceFile::load_seqs(const size_t max_letters, bool load_ids, const BitVector* filter, bool fetch_seqs, bool lazy_masking, const Chunk& chunk)
93 {
94 	task_timer timer("Loading reference sequences");
95 	reopen();
96 
97 	if(max_letters == 0)
98 		seek_chunk(chunk);
99 	init_seqinfo_access();
100 
101 	size_t database_id = tell_seq();
102 	size_t letters = 0, seqs = 0, id_letters = 0, seqs_processed = 0, filtered_seq_count = 0;
103 	vector<uint64_t> filtered_pos;
104 	Block* block = new Block(alphabet_);
105 
106 	SeqInfo r = read_seqinfo();
107 	size_t offset = r.pos;
108 	bool last = false;
109 	if (type() == Type::BLAST && sequence_count() != sparse_sequence_count())
110 		filter = builtin_filter();
111 	const bool use_filter = filter && !filter->empty();
112 
113 	auto goon = [&]() {
114 		if (max_letters > 0)
115 			return (r.seq_len > 0 && letters < max_letters);
116 		else
117 			return (seqs < chunk.n_seqs);
118 	};
119 
120 	while (goon()) {
121 		SeqInfo r_next = read_seqinfo();
122 		if (!use_filter || filter->get(database_id)) {
123 			letters += r.seq_len;
124 			if (fetch_seqs) {
125 				block->seqs_.reserve(r.seq_len);
126 			}
127 			if (load_ids) {
128 				const size_t id_len = this->id_len(r, r_next);
129 				id_letters += id_len;
130 				if (fetch_seqs)
131 					block->ids_.reserve(id_len);
132 			}
133 			++filtered_seq_count;
134 			block->block2oid_.push_back((unsigned)database_id);
135 			if (use_filter) {
136 				filtered_pos.push_back(last ? 0 : r.pos);
137 			}
138 			last = true;
139 		}
140 		else {
141 			last = false;
142 		}
143 		++database_id;
144 		++seqs_processed;
145 		r = r_next;
146 		++seqs;
147 	}
148 
149 	putback_seqinfo();
150 
151 	if (seqs == 0 || filtered_seq_count == 0)
152 		return block;
153 
154 	if (fetch_seqs) {
155 		block->seqs_.finish_reserve();
156 		if (load_ids) block->ids_.finish_reserve();
157 		if (false && type_ == Type::BLAST && config.algo == Config::Algo::QUERY_INDEXED && config.threads_ > 1 && !use_filter) {
158 			assert(!use_filter);
159 			Partition<size_t> p(filtered_seq_count, config.threads_);
160 			vector<std::thread> t;
161 			for (size_t i = 0; i < p.parts; ++i)
162 				t.emplace_back(&SequenceFile::load_block, this, p.begin(i), p.end(i), offset + p.begin(i), use_filter, &filtered_pos, false, block);
163 			for (std::thread& i : t)
164 				i.join();
165 		}
166 		else
167 			load_block(0, filtered_seq_count, offset, use_filter, &filtered_pos, load_ids, block);
168 
169 		timer.finish();
170 		block->seqs_.print_stats();
171 	}
172 
173 	if (config.multiprocessing || config.global_ranking_targets)
174 		blocked_processing = true;
175 	else
176 		blocked_processing = seqs_processed < sequence_count();
177 
178 	if (blocked_processing) // should be always
179 		close_weakly();
180 
181 	if (lazy_masking)
182 		block->masked_.resize(filtered_seq_count, false);
183 
184 	return block;
185 }
186 
get_seq()187 void SequenceFile::get_seq()
188 {
189 	std::map<string, string> seq_titles;
190 	if (!config.query_file.empty()) {
191 		TextInputFile list(config.single_query_file());
192 		while (list.getline(), !list.eof()) {
193 			const vector<string> t(tokenize(list.line.c_str(), "\t"));
194 			if (t.size() != 2)
195 				throw std::runtime_error("Query file format error.");
196 			seq_titles[t[0]] = t[1];
197 		}
198 		list.close();
199 	}
200 
201 	vector<Letter> seq;
202 	string id;
203 	bool all = config.seq_no.size() == 0 && seq_titles.empty();
204 	std::set<size_t> seqs;
205 	if (!all)
206 		for (vector<string>::const_iterator i = config.seq_no.begin(); i != config.seq_no.end(); ++i)
207 			seqs.insert(atoi(i->c_str()) - 1);
208 	const size_t max_letters = config.chunk_size == 0.0 ? std::numeric_limits<size_t>::max() : (size_t)(config.chunk_size * 1e9);
209 	size_t letters = 0;
210 	TextBuffer buf;
211 	OutputFile out(config.output_file);
212 	for (size_t n = 0; n < sequence_count(); ++n) {
213 		read_seq(seq, id);
214 		std::map<string, string>::const_iterator mapped_title = seq_titles.find(Util::Seq::seqid(id.c_str(), false));
215 		if (all || seqs.find(n) != seqs.end() || mapped_title != seq_titles.end()) {
216 			buf << '>' << (mapped_title != seq_titles.end() ? mapped_title->second : id) << '\n';
217 			if (config.reverse) {
218 				Sequence(seq).print(buf, value_traits, Sequence::Reversed());
219 				buf << '\n';
220 			}
221 			else if (config.hardmasked) {
222 				Sequence(seq).print(buf, value_traits, Sequence::Hardmasked());
223 				buf << '\n';
224 			}
225 			else
226 				buf << Sequence(seq) << '\n';
227 		}
228 		out.write(buf.data(), buf.size());
229 		letters += seq.size();
230 		if (letters >= max_letters)
231 			break;
232 		seq.clear();
233 		id.clear();
234 		buf.clear();
235 	}
236 
237 	out.close();
238 }
239 
~SequenceFile()240 SequenceFile::~SequenceFile()
241 {
242 	if (dict_file_) {
243 		dict_file_->close();
244 		dict_file_.reset();
245 	}
246 }
247 
auto_create(string & path,Flags flags,Metadata metadata)248 SequenceFile* SequenceFile::auto_create(string& path, Flags flags, Metadata metadata) {
249 	if (exists(path + ".pin") || exists(path + ".pal")) {
250 #ifdef WITH_BLASTDB
251 		if (config.multiprocessing)
252 			throw std::runtime_error("--multiprocessing is not compatible with BLAST databases.");
253 		if (config.target_indexed)
254 			throw std::runtime_error("--target-indexed is not compatible with BLAST databases.");
255 		return new BlastDB(path, metadata, flags);
256 #else
257 		throw std::runtime_error("This executable was not compiled with support for BLAST databases.");
258 #endif
259 	}
260 	path = auto_append_extension_if_exists(path, DatabaseFile::FILE_EXTENSION);
261 	if (DatabaseFile::is_diamond_db(path)) {
262 		return new DatabaseFile(path, metadata, flags);
263 	}
264 	else if (!flag_any(flags, Flags::NO_FASTA)) {
265 		message_stream << "Database file is not a DIAMOND or BLAST database, treating as FASTA." << std::endl;
266 		config.input_ref_file = { path };
267 		TempFile* db;
268 		DatabaseFile::make_db(&db);
269 		DatabaseFile* r(new DatabaseFile(*db));
270 		delete db;
271 		return r;
272 	}
273 	throw std::runtime_error("Database does not have a supported format.");
274 }
275 
load_dict_block(InputFile * f,const size_t ref_block)276 void SequenceFile::load_dict_block(InputFile* f, const size_t ref_block)
277 {
278 	while (load_dict_entry(*f, ref_block));
279 }
280 
load_dictionary(const size_t query_block,const size_t ref_blocks)281 void SequenceFile::load_dictionary(const size_t query_block, const size_t ref_blocks)
282 {
283 	if (!dict_file_ && !config.multiprocessing)
284 		return;
285 	task_timer timer("Loading dictionary", 3);
286 	if (config.multiprocessing) {
287 		dict_oid_ = vector<vector<uint32_t>>(ref_blocks);
288 		if (flag_any(flags_, Flags::SELF_ALN_SCORES))
289 			dict_self_aln_score_ = vector<vector<double>>(ref_blocks);
290 		reserve_dict(ref_blocks);
291 		for (size_t i = 0; i < ref_blocks; ++i) {
292 			InputFile f(dict_file_name(query_block, i));
293 			load_dict_block(&f, i);
294 			f.close_and_delete();
295 		}
296 	}
297 	else {
298 		TempFile* t = dynamic_cast<TempFile*>(dict_file_.get());
299 		if (!t)
300 			throw std::runtime_error("Failed to load dictionary file.");
301 		dict_oid_ = { {} };
302 		dict_oid_.front().reserve(next_dict_id_);
303 		if (flag_any(flags_, Flags::SELF_ALN_SCORES)) {
304 			dict_self_aln_score_ = { {} };
305 			dict_self_aln_score_.front().reserve(next_dict_id_);
306 		}
307 		reserve_dict(0);
308 		InputFile f(*t);
309 		load_dict_block(&f, 0);
310 		if (dict_oid_.front().size() != next_dict_id_)
311 			throw std::runtime_error("Dictionary corrupted.");
312 		f.close_and_delete();
313 		dict_file_.reset();
314 	}
315 }
316 
free_dictionary()317 void SequenceFile::free_dictionary()
318 {
319 	dict_oid_.clear();
320 	dict_oid_.shrink_to_fit();
321 }
322 
total_blocks() const323 size_t SequenceFile::total_blocks() const {
324 	const size_t c = config.chunk_size * 1e9;
325 	return (this->letters() + c - 1) / c;
326 }
327 
seqs_by_accession(const std::vector<std::string>::const_iterator begin,const std::vector<std::string>::const_iterator end) const328 SequenceSet SequenceFile::seqs_by_accession(const std::vector<std::string>::const_iterator begin, const std::vector<std::string>::const_iterator end) const
329 {
330 	SequenceSet out(Alphabet::NCBI);
331 	vector<size_t> oids;
332 	oids.reserve(end - begin);
333 	for (auto it = begin; it != end; ++it) {
334 		try {
335 			const size_t oid = single_oid(this, *it);
336 			oids.push_back(oid);
337 			out.reserve(seq_length(oid));
338 		}
339 		catch (AccessionNotFound&) {
340 			out.reserve(0);
341 			oids.push_back(SIZE_MAX);
342 		}
343 	}
344 	out.finish_reserve();
345 	vector<Letter> seq;
346 	for (size_t i = 0; i < oids.size(); ++i) {
347 		if (oids[i] == SIZE_MAX)
348 			continue;
349 		seq_data(oids[i], seq);
350 		out.assign(i, seq.begin(), seq.end());
351 		out.convert_to_std_alph(i);
352 	}
353 	out.alphabet() = Alphabet::STD;
354 	return out;
355 }
356 
seq_by_accession(const std::string & acc) const357 std::vector<Letter> SequenceFile::seq_by_accession(const std::string& acc) const
358 {
359 	const size_t oid = single_oid(this, acc);
360 	vector<Letter> seq;
361 	seq_data(oid, seq);
362 	alph_ncbi_to_std(seq.begin(), seq.end());
363 	return seq;
364 }
365 
init_dict(const size_t query_block,const size_t target_block)366 void SequenceFile::init_dict(const size_t query_block, const size_t target_block)
367 {
368 	if (dict_file_)
369 		dict_file_->close();
370 	dict_file_.reset(config.multiprocessing ? new OutputFile(dict_file_name(query_block, target_block)) : new TempFile());
371 	next_dict_id_ = 0;
372 	dict_alloc_size_ = 0;
373 	block_to_dict_id_.clear();
374 }
375 
init_dict_block(size_t block,size_t seq_count,bool persist)376 void SequenceFile::init_dict_block(size_t block, size_t seq_count, bool persist)
377 {
378 	if(!persist)
379 		block_to_dict_id_.clear();
380 	if(block_to_dict_id_.find(block) == block_to_dict_id_.end())
381 		block_to_dict_id_[block] = vector<uint32_t>(seq_count, DICT_EMPTY);
382 }
383 
close_dict_block(bool persist)384 void SequenceFile::close_dict_block(bool persist)
385 {
386 	if (config.multiprocessing) {
387 		dict_file_->close();
388 		dict_file_.reset();
389 	}
390 	if (!persist)
391 		block_to_dict_id_.clear();
392 }
393 
dict_id(size_t block,size_t block_id,size_t oid,size_t len,const char * id,const Letter * seq,const double self_aln_score)394 uint32_t SequenceFile::dict_id(size_t block, size_t block_id, size_t oid, size_t len, const char* id, const Letter* seq, const double self_aln_score)
395 {
396 	auto it = block_to_dict_id_.find(block);
397 	if (it == block_to_dict_id_.end() || block_id >= it->second.size())
398 		throw std::runtime_error("Dictionary not initialized.");
399 	vector<uint32_t>& v = it->second;
400 	uint32_t n = v[block_id];
401 	if (n != DICT_EMPTY)
402 		return n;
403 	{
404 		std::lock_guard<std::mutex> lock(dict_mtx_);
405 		n = v[block_id];
406 		if (n != DICT_EMPTY)
407 			return n;
408 		n = next_dict_id_++;
409 		v[block_id] = n;
410 		write_dict_entry(block, oid, len, id, seq, self_aln_score);
411 		return n;
412 	}
413 }
414 
oid(uint32_t dict_id,const size_t ref_block) const415 size_t SequenceFile::oid(uint32_t dict_id, const size_t ref_block) const
416 {
417 	const size_t b = dict_block(ref_block);
418 	if (b >= dict_oid_.size() || dict_id >= dict_oid_[b].size())
419 		throw std::runtime_error("Dictionary not loaded.");
420 	return dict_oid_[b][dict_id];
421 }
422 
dict_self_aln_score(const size_t dict_id,const size_t ref_block) const423 double SequenceFile::dict_self_aln_score(const size_t dict_id, const size_t ref_block) const
424 {
425 	const size_t b = dict_block(ref_block);
426 	if (b >= dict_self_aln_score_.size() || dict_id >= dict_self_aln_score_[b].size())
427 		throw std::runtime_error("Dictionary not loaded.");
428 	return dict_self_aln_score_[b][dict_id];
429 }
430 
SequenceFile(Type type,Alphabet alphabet,Flags flags)431 SequenceFile::SequenceFile(Type type, Alphabet alphabet, Flags flags):
432 	flags_(flags),
433 	type_(type),
434 	alphabet_(alphabet)
435 {}
436 
db_info()437 void db_info() {
438 	if (config.database.empty())
439 		throw std::runtime_error("Missing option for database file: --db/-d.");
440 	SequenceFile* db = SequenceFile::auto_create(config.database, SequenceFile::Flags::NO_FASTA | SequenceFile::Flags::NO_COMPATIBILITY_CHECK);
441 	const std::streamsize w = 25;
442 	cout << setw(w) << "Database type  " << to_string(db->type()) << endl;
443 	cout << setw(w) << "Database format version  " << db->db_version() << endl;
444 	if(db->type() == SequenceFile::Type::DMND)
445 		cout << setw(w) << "Diamond build  " << db->program_build_version() << endl;
446 	cout << setw(w) << "Sequences  " << db->sequence_count() << endl;
447 	if (db->type() == SequenceFile::Type::BLAST && db->sequence_count() != db->sparse_sequence_count())
448 		cout << setw(w) << "Sequences (filtered) " << db->sparse_sequence_count() << endl;
449 	cout << setw(w) << "Letters  " << db->letters() << endl;
450 	db->close();
451 	delete db;
452 }