1 #include "Aligner.h"
2 #include "Common/Options.h"
3 #include "DataLayer/Options.h"
4 #include "KAligner/Options.h"
5 #include "FastaReader.h"
6 #include "Iterator.h"
7 #include "IOUtil.h"
8 #include "MemoryUtil.h"
9 #include "SAM.h"
10 #include "StringUtil.h" // for toSI
11 #include "Uncompress.h"
12 #include "Pipe.h"
13 #include "PipeMux.h"
14 #include <algorithm>
15 #include <cassert>
16 #include <cctype>
17 #include <cerrno>
18 #include <cstdlib>
19 #include <cstring>
20 #include <fstream>
21 #include <getopt.h>
22 #include <iostream>
23 #include <pthread.h>
24 #include <sstream>
25 #include <string>
26 #include <sys/stat.h>
27 #include <sys/time.h>
28 
29 using namespace std;
30 
31 #define PROGRAM "KAligner"
32 
33 static const char VERSION_MESSAGE[] =
34 PROGRAM " (" PACKAGE_NAME ") " VERSION "\n"
35 "Written by Jared Simpson and Shaun Jackman.\n"
36 "\n"
37 "Copyright 2014 Canada's Michael Smith Genome Sciences Centre\n";
38 
39 static const char USAGE_MESSAGE[] =
40 "Usage: " PROGRAM " -k<kmer> [OPTION]... QUERY... TARGET\n"
41 "Align the sequences of the files QUERY to those of TARGET.\n"
42 "All perfect matches of at least k bases will be found.\n"
43 "\n"
44 " Options:\n"
45 "\n"
46 "  -k, -l, --kmer=N      k-mer size and minimum alignment length\n"
47 "  -s, --section=S/N     split the target into N sections and align\n"
48 "                        reads to section S [1/1]\n"
49 "  -i, --ignore-multimap ignore duplicate k-mer in the target\n"
50 "                        [default]\n"
51 "  -m, --multimap        allow duplicate k-mer in the target\n"
52 "      --no-multimap     disallow duplicate k-mer in the target\n"
53 "  -j, --threads=N       use N threads [2] up to one per query file\n"
54 "                        or if N is 0 use one thread per query file\n"
55 "  -v, --verbose         display verbose output\n"
56 "      --no-sam          output the results in KAligner format\n"
57 "      --sam             output the results in SAM format\n"
58 "      --seq             print the sequence with the alignments\n"
59 "      --help            display this help and exit\n"
60 "      --version         output version information and exit\n"
61 "\n"
62 "Report bugs to <" PACKAGE_BUGREPORT ">.\n";
63 
64 /** Enumeration of output formats */
65 enum format { KALIGNER, SAM };
66 
67 namespace opt {
68 	static unsigned k;
69 	static int threads = 2;
70 	static int printSeq;
71 	static unsigned section = 1;
72 	static unsigned nsections = 1;
73 
74 	/** Output formats */
75 	static int format;
76 }
77 
78 static const char shortopts[] = "ij:k:l:mo:s:v";
79 
80 
81 enum { OPT_HELP = 1, OPT_VERSION, OPT_SYNC };
82 
83 static const struct option longopts[] = {
84 	{ "kmer",        required_argument, NULL, 'k' },
85 	{ "section",     required_argument, NULL, 's' },
86 	{ "no-multi",    no_argument,     &opt::multimap, opt::ERROR },
87 	{ "multimap",    no_argument,     &opt::multimap, opt::MULTIMAP },
88 	{ "ignore-multimap", no_argument, &opt::multimap, opt::IGNORE },
89 	{ "threads",     required_argument,	NULL, 'j' },
90 	{ "verbose",     no_argument,       NULL, 'v' },
91 	{ "no-sam",      no_argument,       &opt::format, KALIGNER },
92 	{ "sam",         no_argument,       &opt::format, SAM },
93 	{ "no-seq",		 no_argument,		&opt::printSeq, 0 },
94 	{ "seq",		 no_argument,		&opt::printSeq, 1 },
95 	{ "help",        no_argument,       NULL, OPT_HELP },
96 	{ "version",     no_argument,       NULL, OPT_VERSION },
97 	{ NULL, 0, NULL, 0 }
98 };
99 
100 /** Return the number of k-mer in the specified file. */
countKmer(const string & path)101 static size_t countKmer(const string& path)
102 {
103 	struct stat st;
104 	if (stat(path.c_str(), &st) == -1) {
105 		perror(path.c_str());
106 		exit(EXIT_FAILURE);
107 	}
108 
109 	if (!S_ISREG(st.st_mode)) {
110 		cerr << "Not calculating k-mer in `" << path
111 			<< "', because it is not a regular file.\n";
112 		return 500000000;
113 	}
114 
115 	if (opt::verbose > 0)
116 		cerr << "Reading target `" << path << "'..." << endl;
117 
118 	ifstream in(path.c_str());
119 	assert(in.is_open());
120 	size_t scaffolds = 0, contigs = 0, bases = 0;
121 	enum { ID, SEQUENCE, GAP } state = SEQUENCE;
122 	for (char c; in.get(c);) {
123 		c = toupper(c);
124 		switch (state) {
125 		  case ID:
126 			if (c == '\n')
127 				state = SEQUENCE;
128 			break;
129 		  case SEQUENCE:
130 		  case GAP:
131 			switch (c) {
132 			  case '>':
133 				scaffolds++;
134 				contigs++;
135 				state = ID;
136 				break;
137 			  case 'N':
138 			  case 'B': case 'D': case 'H': case 'K': case 'M':
139 			  case 'R': case 'S': case 'V': case 'W': case 'Y':
140 				if (state != GAP)
141 					contigs++;
142 				state = GAP;
143 				break;
144 			  case 'A': case 'C': case 'G': case 'T':
145 			  case '0': case '1': case '2': case '3':
146 				bases++;
147 				state = SEQUENCE;
148 				break;
149 			  case '\n':
150 				break;
151 			  default:
152 				cerr << "error: unexpected character: "
153 					"`" << c << "'\n";
154 				exit(EXIT_FAILURE);
155 			}
156 			break;
157 		}
158 	}
159 
160 	size_t overlaps = contigs * (opt::k-1);
161 	size_t kmer = bases - overlaps;
162 	if (opt::verbose > 0) {
163 		cerr << "Read " << bases << " bases, "
164 			<< contigs << " contigs, " << scaffolds << " scaffolds"
165 			" from `" << path << "'. "
166 			"Expecting " << kmer << " k-mer.\n";
167 		cerr << "Index will use at least "
168 			<< toSI(kmer * sizeof(pair<Kmer, Position>))
169 			<< "B.\n";
170 	}
171 	assert(bases > overlaps);
172 	return kmer;
173 }
174 
175 template <class SeqPosHashMap>
176 static void readContigsIntoDB(string refFastaFile,
177 		Aligner<SeqPosHashMap>& aligner);
178 static void *alignReadsToDB(void *arg);
179 static void *readFile(void *arg);
180 
181 /** Unique aligner using map */
182 static Aligner<SeqPosHashUniqueMap> *g_aligner_u;
183 
184 /** Multimap aligner using multimap */
185 static Aligner<SeqPosHashMultiMap> *g_aligner_m;
186 
187 /** Number of reads. */
188 static unsigned g_readCount;
189 
190 /** Number of reads that aligned. */
191 static unsigned g_alignedCount;
192 
193 /** Guard cerr. */
194 static pthread_mutex_t g_mutexCerr = PTHREAD_MUTEX_INITIALIZER;
195 
196 /** Stores the output string and the read index number for an
197  * alignment. */
198 struct OutData
199 {
200 	string s;
201 	size_t index;
202 
OutDataOutData203 	OutData(string s = string(), size_t index = 0)
204 		: s(s), index(index) { }
205 
206 	/** Operator needed for sorting priority queue. */
operator <OutData207 	bool operator<(const OutData& a) const
208 	{
209 		// Smaller index number has higher priority.
210 		return index > a.index;
211 	}
212 };
213 
214 /** Shares data between workers and the output thread. */
215 static Pipe<OutData> g_pipeOut(1<<7);
216 
217 /** Shares data between producer and worker threads. */
218 static PipeMux<FastaRecord> g_pipeMux(1);
219 
220 /** Notification of the current size of the g_pqueue. */
221 static size_t g_pqSize;
222 static const size_t MAX_PQ_SIZE = 1000;
223 
224 /** Conditional variable used to block workers until the g_pqueue has
225  * become small enough. */
226 static pthread_cond_t g_pqSize_cv = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t g_mutexPqSize = PTHREAD_MUTEX_INITIALIZER;
228 
printAlignments(void *)229 static void* printAlignments(void*)
230 {
231 	priority_queue<OutData> pqueue;
232 	size_t index = 1;
233 	for (pair<OutData, size_t> p = g_pipeOut.pop();
234 			p.second > 0; p = g_pipeOut.pop()) {
235 		pqueue.push(p.first);
236 		while (!pqueue.empty()) {
237 			const OutData& rec = pqueue.top();
238 			if (index == rec.index) {
239 				// Print the record at the current index.
240 				index++;
241 				assert(rec.index > 0);
242 				cout << rec.s;
243 				assert_good(cout, "stdout");
244 				pqueue.pop();
245 			} else if (g_pipeMux.invalidEntry(index)) {
246 				// Skip this index since it is invalid.
247 				index++;
248 			} else {
249 				// The record for this index has not been added, get
250 				// another record from the pipe.
251 				break;
252 			}
253 		}
254 
255 		// Let waiting workers continue if the pqueue is small enough.
256 		pthread_mutex_lock(&g_mutexPqSize);
257 		g_pqSize = pqueue.size();
258 		if (g_pqSize < MAX_PQ_SIZE)
259 			pthread_cond_broadcast(&g_pqSize_cv);
260 		pthread_mutex_unlock(&g_mutexPqSize);
261 	}
262 	return NULL;
263 }
264 
265 /** Store a FastaReader and Pipe. */
266 struct WorkerArg {
267 	FastaReader& in;
268 	Pipe<FastaRecord>& out;
WorkerArgWorkerArg269 	WorkerArg(FastaReader& in, Pipe<FastaRecord>& out)
270 		: in(in), out(out) { }
~WorkerArgWorkerArg271 	~WorkerArg() { delete &in; }
272 };
273 
getReadFiles(const char * readsFile)274 static pthread_t getReadFiles(const char *readsFile)
275 {
276 	if (opt::verbose > 0) {
277 		pthread_mutex_lock(&g_mutexCerr);
278 		cerr << "Reading `" << readsFile << "'...\n";
279 		pthread_mutex_unlock(&g_mutexCerr);
280 	}
281 
282 	FastaReader* in = new FastaReader(
283 			readsFile, FastaReader::FOLD_CASE);
284 	WorkerArg* arg = new WorkerArg(*in, *g_pipeMux.addPipe());
285 
286 	pthread_t thread;
287 	pthread_create(&thread, NULL, readFile, static_cast<void*>(arg));
288 
289 	return thread;
290 }
291 
main(int argc,char ** argv)292 int main(int argc, char** argv)
293 {
294 	string commandLine;
295 	{
296 		ostringstream ss;
297 		char** last = argv + argc - 1;
298 		copy(argv, last, ostream_iterator<const char *>(ss, " "));
299 		ss << *last;
300 		commandLine = ss.str();
301 	}
302 
303 	char delim = '/';
304 	bool die = false;
305 	for (int c; (c = getopt_long(argc, argv,
306 					shortopts, longopts, NULL)) != -1;) {
307 		istringstream arg(optarg != NULL ? optarg : "");
308 		switch (c) {
309 			case '?': die = true; break;
310 			case 'k': case 'l':
311 				arg >> opt::k;
312 				break;
313 			case 'm': opt::multimap = opt::MULTIMAP; break;
314 			case 'i': opt::multimap = opt::IGNORE; break;
315 			case 'j': arg >> opt::threads; break;
316 			case 'v': opt::verbose++; break;
317 			case 's': arg >> opt::section >> delim >>
318 					  opt::nsections; break;
319 			case OPT_HELP:
320 				cout << USAGE_MESSAGE;
321 				exit(EXIT_SUCCESS);
322 			case OPT_VERSION:
323 				cout << VERSION_MESSAGE;
324 				exit(EXIT_SUCCESS);
325 		}
326 		if (optarg != NULL && !arg.eof()) {
327 			cerr << PROGRAM ": invalid option: `-"
328 				<< (char)c << optarg << "'\n";
329 			exit(EXIT_FAILURE);
330 		}
331 	}
332 
333 	if (opt::section == 0 || opt::section > opt::nsections
334 			|| delim != '/') {
335 		cerr << PROGRAM ": -s, --section option is incorrectly set\n";
336 		die = true;
337 	}
338 
339 	if (opt::k <= 0) {
340 		cerr << PROGRAM ": missing -k,--kmer option\n";
341 		die = true;
342 	}
343 	Kmer::setLength(opt::k);
344 
345 	if (argc - optind < 2) {
346 		cerr << PROGRAM ": missing arguments\n";
347 		die = true;
348 	}
349 
350 	if (die) {
351 		cerr << "Try `" << PROGRAM
352 			<< " --help' for more information.\n";
353 		exit(EXIT_FAILURE);
354 	}
355 
356 	string refFastaFile(argv[--argc]);
357 
358 	int numQuery = argc - optind;
359 	if (opt::threads <= 0)
360 		opt::threads = numQuery;
361 
362 	// SAM headers.
363 	cout << "@HD\tVN:1.0\n"
364 		"@PG\tID:" PROGRAM "\tVN:" VERSION "\t"
365 		"CL:" << commandLine << '\n';
366 
367 	size_t numKmer = countKmer(refFastaFile);
368 	if (opt::multimap == opt::MULTIMAP) {
369 		g_aligner_m = new Aligner<SeqPosHashMultiMap>(opt::k,
370 				numKmer);
371 		readContigsIntoDB(refFastaFile, *g_aligner_m);
372 	} else {
373 #if HAVE_GOOGLE_SPARSE_HASH_MAP
374 		g_aligner_u = new Aligner<SeqPosHashUniqueMap>(opt::k,
375 				numKmer, 0.3);
376 #else
377 		g_aligner_u = new Aligner<SeqPosHashUniqueMap>(opt::k,
378 				numKmer);
379 #endif
380 		readContigsIntoDB(refFastaFile, *g_aligner_u);
381 	}
382 
383 	g_readCount = 0;
384 
385 	vector<pthread_t> producer_threads;
386 	transform(argv + optind, argv + argc,
387 			back_inserter(producer_threads), getReadFiles);
388 
389 	vector<pthread_t> threads;
390 	for (int i = 0; i < opt::threads; i++) {
391 		pthread_t thread;
392 		pthread_create(&thread, NULL, alignReadsToDB, NULL);
393 		threads.push_back(thread);
394 	}
395 
396 	pthread_t out_thread;
397 	pthread_create(&out_thread, NULL, printAlignments, NULL);
398 
399 	void *status;
400 	// Wait for all threads to finish.
401 	for (size_t i = 0; i < producer_threads.size(); i++)
402 		pthread_join(producer_threads[i], &status);
403 	for (size_t i = 0; i < threads.size(); i++)
404 		pthread_join(threads[i], &status);
405 	g_pipeOut.close();
406 	pthread_join(out_thread, &status);
407 
408 	if (opt::verbose > 0)
409 		cerr << "Aligned " << g_alignedCount
410 			<< " of " << g_readCount << " reads ("
411 			<< (float)100 * g_alignedCount / g_readCount << "%)\n";
412 
413 	if (opt::multimap == opt::MULTIMAP)
414 		delete g_aligner_m;
415 	else
416 		delete g_aligner_u;
417 
418 	return 0;
419 }
420 
421 template <class SeqPosHashMap>
printProgress(const Aligner<SeqPosHashMap> & align,unsigned count)422 static void printProgress(const Aligner<SeqPosHashMap>& align,
423 		unsigned count)
424 {
425 	size_t size = align.size();
426 	size_t buckets = align.bucket_count();
427 	cerr << "Read " << count << " contigs. "
428 		"Hash load: " << size << " / " << buckets
429 		<< " = " << (float)size / buckets
430 		<< " using " << toSI(getMemoryUsage()) << "B." << endl;
431 }
432 
433 template <class SeqPosHashMap>
readContigsIntoDB(string refFastaFile,Aligner<SeqPosHashMap> & aligner)434 static void readContigsIntoDB(string refFastaFile,
435 		Aligner<SeqPosHashMap>& aligner)
436 {
437 	if (opt::verbose > 0)
438 		cerr << "Reading target `" << refFastaFile << "'..." << endl;
439 
440 	unsigned count = 0;
441 	FastaReader in(refFastaFile.c_str(), FastaReader::FOLD_CASE);
442 	if (opt::nsections > 1)
443 		in.split(opt::section, opt::nsections);
444 	for (FastaRecord rec; in >> rec;) {
445 		if (count == 0) {
446 			// Detect colour-space contigs.
447 			opt::colourSpace = isdigit(rec.seq[0]);
448 		} else {
449 			if (opt::colourSpace)
450 				assert(isdigit(rec.seq[0]));
451 			else
452 				assert(isalpha(rec.seq[0]));
453 		}
454 
455 		cout << "@SQ\tSN:" << rec.id
456 			<< "\tLN:" << rec.seq.length() << '\n';
457 		aligner.addReferenceSequence(rec.id, rec.seq);
458 
459 		count++;
460 		if (opt::verbose > 0 && count % 100000 == 0)
461 			printProgress(aligner, count);
462 	}
463 	assert(in.eof());
464 	if (opt::verbose > 0)
465 		printProgress(aligner, count);
466 
467 	if (opt::multimap == opt::IGNORE) {
468 		// Count the number of duplicate k-mer in the target.
469 		size_t duplicates = aligner.countDuplicates();
470 		if (duplicates > 0)
471 			cerr << "Found " << duplicates
472 				<< " (" << (float)100 * duplicates / aligner.size()
473 				<< "%) duplicate k-mer.\n";
474 	}
475 }
476 
477 /** Read each fasta record from 'in', and add it to 'pipe'. */
readFile(FastaReader & in,Pipe<FastaRecord> & pipe)478 static void readFile(FastaReader& in, Pipe<FastaRecord>& pipe)
479 {
480 	for (FastaRecord rec; in >> rec;)
481 		pipe.push(rec);
482 	assert(in.eof());
483 	pipe.close();
484 }
485 
486 /** Producer thread. */
readFile(void * arg)487 static void* readFile(void* arg)
488 {
489 	WorkerArg* p = static_cast<WorkerArg*>(arg);
490 	readFile(p->in, p->out);
491 	delete p;
492 	return NULL;
493 }
494 
495 /** @Returns the time in seconds between [start, end]. */
timeDiff(const timeval & start,const timeval & end)496 static double timeDiff(const timeval& start, const timeval& end)
497 {
498 	double result = (double)end.tv_sec +
499 		(double)end.tv_usec/1000000.0;
500 	result -= (double)start.tv_sec +
501 		(double)start.tv_usec/1000000.0;
502 	return result;
503 }
504 
alignReadsToDB(void *)505 static void* alignReadsToDB(void*)
506 {
507 	opt::chastityFilter = false;
508 	opt::trimMasked = false;
509 	static timeval start, end;
510 
511 	pthread_mutex_lock(&g_mutexCerr);
512 	gettimeofday(&start, NULL);
513 	pthread_mutex_unlock(&g_mutexCerr);
514 
515 	for (pair<FastaRecord, size_t> recPair = g_pipeMux.nextValue();
516 			recPair.second > 0; recPair = g_pipeMux.nextValue()) {
517 		const FastaRecord& rec = recPair.first;
518 		const Sequence& seq = rec.seq;
519 		ostringstream output;
520 		if (seq.find_first_not_of("ACGT0123") == string::npos) {
521 			if (opt::colourSpace)
522 				assert(isdigit(seq[0]));
523 			else
524 				assert(isalpha(seq[0]));
525 		}
526 
527 		switch (opt::format) {
528 		  case KALIGNER:
529 			if (opt::multimap == opt::MULTIMAP)
530 				g_aligner_m->alignRead(rec.id, seq,
531 						affix_ostream_iterator<Alignment>(
532 							output, "\t"));
533 			else
534 				g_aligner_u->alignRead(rec.id, seq,
535 						affix_ostream_iterator<Alignment>(
536 							output, "\t"));
537 			break;
538 		  case SAM:
539 			if (opt::multimap == opt::MULTIMAP)
540 				g_aligner_m->alignRead(rec.id, seq,
541 						ostream_iterator<SAMRecord>(output, "\n"));
542 			else
543 				g_aligner_u->alignRead(rec.id, seq,
544 						ostream_iterator<SAMRecord>(output, "\n"));
545 			break;
546 		}
547 
548 		ostringstream out;
549 		string s = output.str();
550 		switch (opt::format) {
551 		  case KALIGNER:
552 			out << rec.id;
553 			if (opt::printSeq) {
554 				out << ' ';
555 				if (opt::colourSpace)
556 					out << rec.anchor;
557 				out << seq;
558 			}
559 			out << s << '\n';
560 			break;
561 		  case SAM:
562 			out << s;
563 			break;
564 		}
565 		g_pipeOut.push(OutData(out.str(), recPair.second));
566 
567 		// Prevent the priority_queue from growing too large by
568 		// waiting for threads going far too slow.
569 		pthread_mutex_lock(&g_mutexPqSize);
570 		if (g_pqSize >= MAX_PQ_SIZE)
571 			pthread_cond_wait(&g_pqSize_cv, &g_mutexPqSize);
572 		pthread_mutex_unlock(&g_mutexPqSize);
573 
574 		if (opt::verbose > 0) {
575 			pthread_mutex_lock(&g_mutexCerr);
576 			if (!s.empty())
577 				g_alignedCount++;
578 			if (++g_readCount % 1000000 == 0) {
579 				gettimeofday(&end, NULL);
580 				double result = timeDiff(start, end);
581 				cerr << "Aligned " << g_readCount << " reads at "
582 					<< (int)(1000000 / result) << " reads/sec.\n";
583 				start = end;
584 			}
585 			pthread_mutex_unlock(&g_mutexCerr);
586 		}
587 	}
588 	return NULL;
589 }
590