1 #include "Aligner.h"
2 #include "Common/Options.h"
3 #include "DataLayer/Options.h"
4 #include "KAligner/Options.h"
5 #include "FastaReader.h"
6 #include "Iterator.h"
7 #include "IOUtil.h"
8 #include "MemoryUtil.h"
9 #include "SAM.h"
10 #include "StringUtil.h" // for toSI
11 #include "Uncompress.h"
12 #include "Pipe.h"
13 #include "PipeMux.h"
14 #include <algorithm>
15 #include <cassert>
16 #include <cctype>
17 #include <cerrno>
18 #include <cstdlib>
19 #include <cstring>
20 #include <fstream>
21 #include <getopt.h>
22 #include <iostream>
23 #include <pthread.h>
24 #include <sstream>
25 #include <string>
26 #include <sys/stat.h>
27 #include <sys/time.h>
28
29 using namespace std;
30
31 #define PROGRAM "KAligner"
32
33 static const char VERSION_MESSAGE[] =
34 PROGRAM " (" PACKAGE_NAME ") " VERSION "\n"
35 "Written by Jared Simpson and Shaun Jackman.\n"
36 "\n"
37 "Copyright 2014 Canada's Michael Smith Genome Sciences Centre\n";
38
39 static const char USAGE_MESSAGE[] =
40 "Usage: " PROGRAM " -k<kmer> [OPTION]... QUERY... TARGET\n"
41 "Align the sequences of the files QUERY to those of TARGET.\n"
42 "All perfect matches of at least k bases will be found.\n"
43 "\n"
44 " Options:\n"
45 "\n"
46 " -k, -l, --kmer=N k-mer size and minimum alignment length\n"
47 " -s, --section=S/N split the target into N sections and align\n"
48 " reads to section S [1/1]\n"
49 " -i, --ignore-multimap ignore duplicate k-mer in the target\n"
50 " [default]\n"
51 " -m, --multimap allow duplicate k-mer in the target\n"
52 " --no-multimap disallow duplicate k-mer in the target\n"
53 " -j, --threads=N use N threads [2] up to one per query file\n"
54 " or if N is 0 use one thread per query file\n"
55 " -v, --verbose display verbose output\n"
56 " --no-sam output the results in KAligner format\n"
57 " --sam output the results in SAM format\n"
58 " --seq print the sequence with the alignments\n"
59 " --help display this help and exit\n"
60 " --version output version information and exit\n"
61 "\n"
62 "Report bugs to <" PACKAGE_BUGREPORT ">.\n";
63
64 /** Enumeration of output formats */
65 enum format { KALIGNER, SAM };
66
67 namespace opt {
68 static unsigned k;
69 static int threads = 2;
70 static int printSeq;
71 static unsigned section = 1;
72 static unsigned nsections = 1;
73
74 /** Output formats */
75 static int format;
76 }
77
78 static const char shortopts[] = "ij:k:l:mo:s:v";
79
80
81 enum { OPT_HELP = 1, OPT_VERSION, OPT_SYNC };
82
83 static const struct option longopts[] = {
84 { "kmer", required_argument, NULL, 'k' },
85 { "section", required_argument, NULL, 's' },
86 { "no-multi", no_argument, &opt::multimap, opt::ERROR },
87 { "multimap", no_argument, &opt::multimap, opt::MULTIMAP },
88 { "ignore-multimap", no_argument, &opt::multimap, opt::IGNORE },
89 { "threads", required_argument, NULL, 'j' },
90 { "verbose", no_argument, NULL, 'v' },
91 { "no-sam", no_argument, &opt::format, KALIGNER },
92 { "sam", no_argument, &opt::format, SAM },
93 { "no-seq", no_argument, &opt::printSeq, 0 },
94 { "seq", no_argument, &opt::printSeq, 1 },
95 { "help", no_argument, NULL, OPT_HELP },
96 { "version", no_argument, NULL, OPT_VERSION },
97 { NULL, 0, NULL, 0 }
98 };
99
100 /** Return the number of k-mer in the specified file. */
countKmer(const string & path)101 static size_t countKmer(const string& path)
102 {
103 struct stat st;
104 if (stat(path.c_str(), &st) == -1) {
105 perror(path.c_str());
106 exit(EXIT_FAILURE);
107 }
108
109 if (!S_ISREG(st.st_mode)) {
110 cerr << "Not calculating k-mer in `" << path
111 << "', because it is not a regular file.\n";
112 return 500000000;
113 }
114
115 if (opt::verbose > 0)
116 cerr << "Reading target `" << path << "'..." << endl;
117
118 ifstream in(path.c_str());
119 assert(in.is_open());
120 size_t scaffolds = 0, contigs = 0, bases = 0;
121 enum { ID, SEQUENCE, GAP } state = SEQUENCE;
122 for (char c; in.get(c);) {
123 c = toupper(c);
124 switch (state) {
125 case ID:
126 if (c == '\n')
127 state = SEQUENCE;
128 break;
129 case SEQUENCE:
130 case GAP:
131 switch (c) {
132 case '>':
133 scaffolds++;
134 contigs++;
135 state = ID;
136 break;
137 case 'N':
138 case 'B': case 'D': case 'H': case 'K': case 'M':
139 case 'R': case 'S': case 'V': case 'W': case 'Y':
140 if (state != GAP)
141 contigs++;
142 state = GAP;
143 break;
144 case 'A': case 'C': case 'G': case 'T':
145 case '0': case '1': case '2': case '3':
146 bases++;
147 state = SEQUENCE;
148 break;
149 case '\n':
150 break;
151 default:
152 cerr << "error: unexpected character: "
153 "`" << c << "'\n";
154 exit(EXIT_FAILURE);
155 }
156 break;
157 }
158 }
159
160 size_t overlaps = contigs * (opt::k-1);
161 size_t kmer = bases - overlaps;
162 if (opt::verbose > 0) {
163 cerr << "Read " << bases << " bases, "
164 << contigs << " contigs, " << scaffolds << " scaffolds"
165 " from `" << path << "'. "
166 "Expecting " << kmer << " k-mer.\n";
167 cerr << "Index will use at least "
168 << toSI(kmer * sizeof(pair<Kmer, Position>))
169 << "B.\n";
170 }
171 assert(bases > overlaps);
172 return kmer;
173 }
174
175 template <class SeqPosHashMap>
176 static void readContigsIntoDB(string refFastaFile,
177 Aligner<SeqPosHashMap>& aligner);
178 static void *alignReadsToDB(void *arg);
179 static void *readFile(void *arg);
180
181 /** Unique aligner using map */
182 static Aligner<SeqPosHashUniqueMap> *g_aligner_u;
183
184 /** Multimap aligner using multimap */
185 static Aligner<SeqPosHashMultiMap> *g_aligner_m;
186
187 /** Number of reads. */
188 static unsigned g_readCount;
189
190 /** Number of reads that aligned. */
191 static unsigned g_alignedCount;
192
193 /** Guard cerr. */
194 static pthread_mutex_t g_mutexCerr = PTHREAD_MUTEX_INITIALIZER;
195
196 /** Stores the output string and the read index number for an
197 * alignment. */
198 struct OutData
199 {
200 string s;
201 size_t index;
202
OutDataOutData203 OutData(string s = string(), size_t index = 0)
204 : s(s), index(index) { }
205
206 /** Operator needed for sorting priority queue. */
operator <OutData207 bool operator<(const OutData& a) const
208 {
209 // Smaller index number has higher priority.
210 return index > a.index;
211 }
212 };
213
214 /** Shares data between workers and the output thread. */
215 static Pipe<OutData> g_pipeOut(1<<7);
216
217 /** Shares data between producer and worker threads. */
218 static PipeMux<FastaRecord> g_pipeMux(1);
219
220 /** Notification of the current size of the g_pqueue. */
221 static size_t g_pqSize;
222 static const size_t MAX_PQ_SIZE = 1000;
223
224 /** Conditional variable used to block workers until the g_pqueue has
225 * become small enough. */
226 static pthread_cond_t g_pqSize_cv = PTHREAD_COND_INITIALIZER;
227 static pthread_mutex_t g_mutexPqSize = PTHREAD_MUTEX_INITIALIZER;
228
printAlignments(void *)229 static void* printAlignments(void*)
230 {
231 priority_queue<OutData> pqueue;
232 size_t index = 1;
233 for (pair<OutData, size_t> p = g_pipeOut.pop();
234 p.second > 0; p = g_pipeOut.pop()) {
235 pqueue.push(p.first);
236 while (!pqueue.empty()) {
237 const OutData& rec = pqueue.top();
238 if (index == rec.index) {
239 // Print the record at the current index.
240 index++;
241 assert(rec.index > 0);
242 cout << rec.s;
243 assert_good(cout, "stdout");
244 pqueue.pop();
245 } else if (g_pipeMux.invalidEntry(index)) {
246 // Skip this index since it is invalid.
247 index++;
248 } else {
249 // The record for this index has not been added, get
250 // another record from the pipe.
251 break;
252 }
253 }
254
255 // Let waiting workers continue if the pqueue is small enough.
256 pthread_mutex_lock(&g_mutexPqSize);
257 g_pqSize = pqueue.size();
258 if (g_pqSize < MAX_PQ_SIZE)
259 pthread_cond_broadcast(&g_pqSize_cv);
260 pthread_mutex_unlock(&g_mutexPqSize);
261 }
262 return NULL;
263 }
264
265 /** Store a FastaReader and Pipe. */
266 struct WorkerArg {
267 FastaReader& in;
268 Pipe<FastaRecord>& out;
WorkerArgWorkerArg269 WorkerArg(FastaReader& in, Pipe<FastaRecord>& out)
270 : in(in), out(out) { }
~WorkerArgWorkerArg271 ~WorkerArg() { delete ∈ }
272 };
273
getReadFiles(const char * readsFile)274 static pthread_t getReadFiles(const char *readsFile)
275 {
276 if (opt::verbose > 0) {
277 pthread_mutex_lock(&g_mutexCerr);
278 cerr << "Reading `" << readsFile << "'...\n";
279 pthread_mutex_unlock(&g_mutexCerr);
280 }
281
282 FastaReader* in = new FastaReader(
283 readsFile, FastaReader::FOLD_CASE);
284 WorkerArg* arg = new WorkerArg(*in, *g_pipeMux.addPipe());
285
286 pthread_t thread;
287 pthread_create(&thread, NULL, readFile, static_cast<void*>(arg));
288
289 return thread;
290 }
291
main(int argc,char ** argv)292 int main(int argc, char** argv)
293 {
294 string commandLine;
295 {
296 ostringstream ss;
297 char** last = argv + argc - 1;
298 copy(argv, last, ostream_iterator<const char *>(ss, " "));
299 ss << *last;
300 commandLine = ss.str();
301 }
302
303 char delim = '/';
304 bool die = false;
305 for (int c; (c = getopt_long(argc, argv,
306 shortopts, longopts, NULL)) != -1;) {
307 istringstream arg(optarg != NULL ? optarg : "");
308 switch (c) {
309 case '?': die = true; break;
310 case 'k': case 'l':
311 arg >> opt::k;
312 break;
313 case 'm': opt::multimap = opt::MULTIMAP; break;
314 case 'i': opt::multimap = opt::IGNORE; break;
315 case 'j': arg >> opt::threads; break;
316 case 'v': opt::verbose++; break;
317 case 's': arg >> opt::section >> delim >>
318 opt::nsections; break;
319 case OPT_HELP:
320 cout << USAGE_MESSAGE;
321 exit(EXIT_SUCCESS);
322 case OPT_VERSION:
323 cout << VERSION_MESSAGE;
324 exit(EXIT_SUCCESS);
325 }
326 if (optarg != NULL && !arg.eof()) {
327 cerr << PROGRAM ": invalid option: `-"
328 << (char)c << optarg << "'\n";
329 exit(EXIT_FAILURE);
330 }
331 }
332
333 if (opt::section == 0 || opt::section > opt::nsections
334 || delim != '/') {
335 cerr << PROGRAM ": -s, --section option is incorrectly set\n";
336 die = true;
337 }
338
339 if (opt::k <= 0) {
340 cerr << PROGRAM ": missing -k,--kmer option\n";
341 die = true;
342 }
343 Kmer::setLength(opt::k);
344
345 if (argc - optind < 2) {
346 cerr << PROGRAM ": missing arguments\n";
347 die = true;
348 }
349
350 if (die) {
351 cerr << "Try `" << PROGRAM
352 << " --help' for more information.\n";
353 exit(EXIT_FAILURE);
354 }
355
356 string refFastaFile(argv[--argc]);
357
358 int numQuery = argc - optind;
359 if (opt::threads <= 0)
360 opt::threads = numQuery;
361
362 // SAM headers.
363 cout << "@HD\tVN:1.0\n"
364 "@PG\tID:" PROGRAM "\tVN:" VERSION "\t"
365 "CL:" << commandLine << '\n';
366
367 size_t numKmer = countKmer(refFastaFile);
368 if (opt::multimap == opt::MULTIMAP) {
369 g_aligner_m = new Aligner<SeqPosHashMultiMap>(opt::k,
370 numKmer);
371 readContigsIntoDB(refFastaFile, *g_aligner_m);
372 } else {
373 #if HAVE_GOOGLE_SPARSE_HASH_MAP
374 g_aligner_u = new Aligner<SeqPosHashUniqueMap>(opt::k,
375 numKmer, 0.3);
376 #else
377 g_aligner_u = new Aligner<SeqPosHashUniqueMap>(opt::k,
378 numKmer);
379 #endif
380 readContigsIntoDB(refFastaFile, *g_aligner_u);
381 }
382
383 g_readCount = 0;
384
385 vector<pthread_t> producer_threads;
386 transform(argv + optind, argv + argc,
387 back_inserter(producer_threads), getReadFiles);
388
389 vector<pthread_t> threads;
390 for (int i = 0; i < opt::threads; i++) {
391 pthread_t thread;
392 pthread_create(&thread, NULL, alignReadsToDB, NULL);
393 threads.push_back(thread);
394 }
395
396 pthread_t out_thread;
397 pthread_create(&out_thread, NULL, printAlignments, NULL);
398
399 void *status;
400 // Wait for all threads to finish.
401 for (size_t i = 0; i < producer_threads.size(); i++)
402 pthread_join(producer_threads[i], &status);
403 for (size_t i = 0; i < threads.size(); i++)
404 pthread_join(threads[i], &status);
405 g_pipeOut.close();
406 pthread_join(out_thread, &status);
407
408 if (opt::verbose > 0)
409 cerr << "Aligned " << g_alignedCount
410 << " of " << g_readCount << " reads ("
411 << (float)100 * g_alignedCount / g_readCount << "%)\n";
412
413 if (opt::multimap == opt::MULTIMAP)
414 delete g_aligner_m;
415 else
416 delete g_aligner_u;
417
418 return 0;
419 }
420
421 template <class SeqPosHashMap>
printProgress(const Aligner<SeqPosHashMap> & align,unsigned count)422 static void printProgress(const Aligner<SeqPosHashMap>& align,
423 unsigned count)
424 {
425 size_t size = align.size();
426 size_t buckets = align.bucket_count();
427 cerr << "Read " << count << " contigs. "
428 "Hash load: " << size << " / " << buckets
429 << " = " << (float)size / buckets
430 << " using " << toSI(getMemoryUsage()) << "B." << endl;
431 }
432
433 template <class SeqPosHashMap>
readContigsIntoDB(string refFastaFile,Aligner<SeqPosHashMap> & aligner)434 static void readContigsIntoDB(string refFastaFile,
435 Aligner<SeqPosHashMap>& aligner)
436 {
437 if (opt::verbose > 0)
438 cerr << "Reading target `" << refFastaFile << "'..." << endl;
439
440 unsigned count = 0;
441 FastaReader in(refFastaFile.c_str(), FastaReader::FOLD_CASE);
442 if (opt::nsections > 1)
443 in.split(opt::section, opt::nsections);
444 for (FastaRecord rec; in >> rec;) {
445 if (count == 0) {
446 // Detect colour-space contigs.
447 opt::colourSpace = isdigit(rec.seq[0]);
448 } else {
449 if (opt::colourSpace)
450 assert(isdigit(rec.seq[0]));
451 else
452 assert(isalpha(rec.seq[0]));
453 }
454
455 cout << "@SQ\tSN:" << rec.id
456 << "\tLN:" << rec.seq.length() << '\n';
457 aligner.addReferenceSequence(rec.id, rec.seq);
458
459 count++;
460 if (opt::verbose > 0 && count % 100000 == 0)
461 printProgress(aligner, count);
462 }
463 assert(in.eof());
464 if (opt::verbose > 0)
465 printProgress(aligner, count);
466
467 if (opt::multimap == opt::IGNORE) {
468 // Count the number of duplicate k-mer in the target.
469 size_t duplicates = aligner.countDuplicates();
470 if (duplicates > 0)
471 cerr << "Found " << duplicates
472 << " (" << (float)100 * duplicates / aligner.size()
473 << "%) duplicate k-mer.\n";
474 }
475 }
476
477 /** Read each fasta record from 'in', and add it to 'pipe'. */
readFile(FastaReader & in,Pipe<FastaRecord> & pipe)478 static void readFile(FastaReader& in, Pipe<FastaRecord>& pipe)
479 {
480 for (FastaRecord rec; in >> rec;)
481 pipe.push(rec);
482 assert(in.eof());
483 pipe.close();
484 }
485
486 /** Producer thread. */
readFile(void * arg)487 static void* readFile(void* arg)
488 {
489 WorkerArg* p = static_cast<WorkerArg*>(arg);
490 readFile(p->in, p->out);
491 delete p;
492 return NULL;
493 }
494
495 /** @Returns the time in seconds between [start, end]. */
timeDiff(const timeval & start,const timeval & end)496 static double timeDiff(const timeval& start, const timeval& end)
497 {
498 double result = (double)end.tv_sec +
499 (double)end.tv_usec/1000000.0;
500 result -= (double)start.tv_sec +
501 (double)start.tv_usec/1000000.0;
502 return result;
503 }
504
alignReadsToDB(void *)505 static void* alignReadsToDB(void*)
506 {
507 opt::chastityFilter = false;
508 opt::trimMasked = false;
509 static timeval start, end;
510
511 pthread_mutex_lock(&g_mutexCerr);
512 gettimeofday(&start, NULL);
513 pthread_mutex_unlock(&g_mutexCerr);
514
515 for (pair<FastaRecord, size_t> recPair = g_pipeMux.nextValue();
516 recPair.second > 0; recPair = g_pipeMux.nextValue()) {
517 const FastaRecord& rec = recPair.first;
518 const Sequence& seq = rec.seq;
519 ostringstream output;
520 if (seq.find_first_not_of("ACGT0123") == string::npos) {
521 if (opt::colourSpace)
522 assert(isdigit(seq[0]));
523 else
524 assert(isalpha(seq[0]));
525 }
526
527 switch (opt::format) {
528 case KALIGNER:
529 if (opt::multimap == opt::MULTIMAP)
530 g_aligner_m->alignRead(rec.id, seq,
531 affix_ostream_iterator<Alignment>(
532 output, "\t"));
533 else
534 g_aligner_u->alignRead(rec.id, seq,
535 affix_ostream_iterator<Alignment>(
536 output, "\t"));
537 break;
538 case SAM:
539 if (opt::multimap == opt::MULTIMAP)
540 g_aligner_m->alignRead(rec.id, seq,
541 ostream_iterator<SAMRecord>(output, "\n"));
542 else
543 g_aligner_u->alignRead(rec.id, seq,
544 ostream_iterator<SAMRecord>(output, "\n"));
545 break;
546 }
547
548 ostringstream out;
549 string s = output.str();
550 switch (opt::format) {
551 case KALIGNER:
552 out << rec.id;
553 if (opt::printSeq) {
554 out << ' ';
555 if (opt::colourSpace)
556 out << rec.anchor;
557 out << seq;
558 }
559 out << s << '\n';
560 break;
561 case SAM:
562 out << s;
563 break;
564 }
565 g_pipeOut.push(OutData(out.str(), recPair.second));
566
567 // Prevent the priority_queue from growing too large by
568 // waiting for threads going far too slow.
569 pthread_mutex_lock(&g_mutexPqSize);
570 if (g_pqSize >= MAX_PQ_SIZE)
571 pthread_cond_wait(&g_pqSize_cv, &g_mutexPqSize);
572 pthread_mutex_unlock(&g_mutexPqSize);
573
574 if (opt::verbose > 0) {
575 pthread_mutex_lock(&g_mutexCerr);
576 if (!s.empty())
577 g_alignedCount++;
578 if (++g_readCount % 1000000 == 0) {
579 gettimeofday(&end, NULL);
580 double result = timeDiff(start, end);
581 cerr << "Aligned " << g_readCount << " reads at "
582 << (int)(1000000 / result) << " reads/sec.\n";
583 start = end;
584 }
585 pthread_mutex_unlock(&g_mutexCerr);
586 }
587 }
588 return NULL;
589 }
590