1 #ifndef RAZERS_RAZERS_PARALLEL_H_
2 #define RAZERS_RAZERS_PARALLEL_H_
3
4 // TODO(holtgrew): Ideally, we do not need any locks.
5
6 #include "parallel_misc.h"
7 #include "parallel_job_queue.h"
8 #include "razers_match_filter.h"
9
10 namespace seqan {
11
12 // ===========================================================================
13 // Enums, Tags, Classes, Specializations
14 // ===========================================================================
15
16 template <typename TSpec>
17 class Lock;
18
19 struct Omp_;
20 typedef Tag<Omp_> Omp;
21
22 template <>
23 class Lock<Omp>
24 {
25 public:
26 omp_lock_t lock_;
27
Lock()28 Lock() { omp_init_lock(&lock_); }
29
~Lock()30 ~Lock() { omp_destroy_lock(&lock_); }
31 };
32
33 template <typename TMatches>
34 struct SingleVerificationResult
35 {
36 std::shared_ptr<TMatches> matches;
37 unsigned hitGroupId;
38 unsigned windowNo;
39
SingleVerificationResultSingleVerificationResult40 SingleVerificationResult() :
41 hitGroupId(0), windowNo(0)
42 {}
43
SingleVerificationResultSingleVerificationResult44 SingleVerificationResult(std::shared_ptr<TMatches> & matches_, unsigned hitGroupId_, unsigned windowNo_) :
45 matches(matches_), hitGroupId(hitGroupId_), windowNo(windowNo_)
46 {}
47 };
48
49 // Stores the results of the verification.
50 //
51 // Put into its own class so it can be locked independently of other class
52 // members.
53 template <typename TMatches>
54 class SingleVerificationResults
55 {
56 public:
57 String<SingleVerificationResult<TMatches> > localMatches;
58 Lock<Omp> * lock;
59
SingleVerificationResults()60 SingleVerificationResults() :
61 lock(new Lock<Omp>()) {}
62
SingleVerificationResults(SingleVerificationResults const & other)63 SingleVerificationResults(SingleVerificationResults const & other) :
64 localMatches(other.localMatches), lock(new Lock<Omp>())
65 {
66 // Not thread-safe copying since this is only used at the beginning when resizing block local storages string.
67 }
68
69 SingleVerificationResults & operator=(SingleVerificationResults const & other)
70 {
71 if (this == &other)
72 return *this;
73
74 localMatches = other.localMatches;
75 return *this;
76 }
77
~SingleVerificationResults()78 ~SingleVerificationResults()
79 {
80 delete lock;
81 }
82
83 };
84
85 template <
86 typename TMatches,
87 typename TFragmentStore,
88 typename TFilterFinder,
89 typename TFilterPattern,
90 typename TShape /*TODO(holtgrew): Superflous.*/,
91 typename TOptions,
92 typename TCounts,
93 typename TRazerSMode>
94 struct MapSingleReads {};
95
96 template <typename TSpec>
97 class ThreadLocalStorage;
98
99 // ThreadLocalStorage specialization for single-end read mapping in RazerS.
100 template <
101 typename TMatches_,
102 typename TFragmentStore,
103 typename TFilterFinder_,
104 typename TFilterPattern_,
105 typename TShape /*TODO(holtgrew): Superflous.*/,
106 typename TOptions,
107 typename TCounts,
108 typename TRazerSMode
109 >
110 class ThreadLocalStorage<
111 MapSingleReads<
112 TMatches_,
113 TFragmentStore,
114 TFilterFinder_,
115 TFilterPattern_,
116 TShape,
117 TOptions,
118 TCounts,
119 TRazerSMode> >
120 {
121 public:
122 typedef TFilterPattern_ TFilterPattern;
123 typedef TFilterFinder_ TFilterFinder;
124
125 typedef TMatches_ TMatches;
126 #ifdef RAZERS_EXTERNAL_MATCHES
127 typedef TMatches TLargeMatches;
128 #else // #ifdef RAZERS_EXTERNAL_MATCHES
129 typedef typename Value<TMatches>::Type TMatchRecord;
130 typedef String<TMatchRecord, MMap<ExternalConfigLarge<> > > TLargeMatches;
131 #endif // #ifdef RAZERS_EXTERNAL_MATCHES
132
133 // The id of this thread.
134 unsigned threadId;
135
136 // Each thread needs its local options since the compactionThreshold is changed.
137 // TODO(holtgrew): Change overall program structure so this is factorized out of the options struct.
138 TOptions options;
139 TOptions /*const*/ * globalOptions;
140
141 // Each thread has its own SWIFT finder and pattern object.
142 TFilterFinder filterFinder;
143 TFilterPattern filterPattern;
144
145 TCounts counts; // TODO(holtgrew): Artifact?
146
147 TLargeMatches matches; // TODO(holtgrew): However, not used in global store since global reads-to-reference alignment requires everything to be in memory
148 TFragmentStore /*const*/ * globalStore;
149
150 TShape shape;
151
152 String<String<SingleVerificationResult<TMatches> > > verificationResultBuckets;
153 String<unsigned> missingInBucket;
154 unsigned nextBucketToWriteBack;
155
156 typedef MatchVerifier<TFragmentStore, TMatches, TOptions, TRazerSMode, TFilterPattern, TCounts> TMatchVerifier;
157 TMatchVerifier verifier;
158
159 // Mailbox for the verification results.
160 SingleVerificationResults<TMatches> verificationResults;
161
162 typedef MatchFilter<typename Spec<TOptions>::Type, typename TFragmentStore::TReadSeqStore, ThreadLocalStorage> TMatchFilter;
163 std::shared_ptr<TMatchFilter> matchFilter;
164
165 String<unsigned> splitters;
166
ThreadLocalStorage()167 ThreadLocalStorage() {}
168 };
169
170 template <typename TSpec>
limitRead(ThreadLocalStorage<TSpec> & tls,unsigned readId,int newLimit)171 inline void limitRead(ThreadLocalStorage<TSpec> & tls, unsigned readId, int newLimit)
172 {
173 setMaxErrors(tls, readId, newLimit);
174 }
175
176 template <typename TSpec>
disableRead(ThreadLocalStorage<TSpec> & tls,unsigned readId)177 inline void disableRead(ThreadLocalStorage<TSpec> & tls, unsigned readId)
178 {
179 setMaxErrors(tls, readId, -1);
180 }
181
182 template <typename TMatches, typename TFragmentStore, typename THitString, typename TOptions, typename TFilterPattern>
183 struct SingleVerification;
184
185 template <typename TMatches, typename TFragmentStore, typename THitString_, typename TOptions, typename TFilterPattern>
186 class Job<SingleVerification<TMatches, TFragmentStore, THitString_, TOptions, TFilterPattern> >
187 {
188 public:
189 typedef SingleVerificationResults<TMatches> TVerificationResults;
190 typedef THitString_ THitString;
191
192 int threadId;
193 TVerificationResults * verificationResults;
194 TFragmentStore * globalStore;
195 unsigned contigId;
196 unsigned windowNo;
197 std::shared_ptr<THitString> hitsPtr;
198 unsigned hitGroupId;
199 unsigned hitBegin;
200 unsigned hitEnd;
201 TOptions * options;
202 TFilterPattern * filterPattern;
203
Job()204 Job() {}
205
Job(int threadId_,TVerificationResults & verificationResults_,TFragmentStore & globalStore_,unsigned contigId_,unsigned windowNo_,std::shared_ptr<THitString> & hitsPtr_,unsigned hitGroupId_,unsigned hitBegin_,unsigned hitEnd_,TOptions & options_,TFilterPattern & filterPattern_)206 Job(int threadId_, TVerificationResults & verificationResults_, TFragmentStore & globalStore_, unsigned contigId_, unsigned windowNo_, std::shared_ptr<THitString> & hitsPtr_, unsigned hitGroupId_, unsigned hitBegin_, unsigned hitEnd_, TOptions & options_, TFilterPattern & filterPattern_) :
207 threadId(threadId_), verificationResults(&verificationResults_), globalStore(&globalStore_), contigId(contigId_), windowNo(windowNo_), hitsPtr(hitsPtr_), hitGroupId(hitGroupId_), hitBegin(hitBegin_), hitEnd(hitEnd_), options(&options_), filterPattern(&filterPattern_)
208 {}
209 };
210
211 // ===========================================================================
212 // Metafunctions
213 // ===========================================================================
214
215 // ===========================================================================
216 // Functions
217 // ===========================================================================
218
219 template <typename TMatches>
220 inline void
appendToVerificationResults(SingleVerificationResults<TMatches> & verificationResults,SingleVerificationResult<TMatches> const & results)221 appendToVerificationResults(SingleVerificationResults<TMatches> & verificationResults, SingleVerificationResult<TMatches> const & results)
222 {
223 omp_set_lock(&verificationResults.lock->lock_);
224 // if (length(verificationResults.localMatches) > 0u)
225 // SEQAN_OMP_PRAGMA(critical)
226 // std::cerr << "BEFORE: " << &verificationResults << " length(*front(verificationResults.localMatches).matches) == " << length(*front(verificationResults.localMatches).matches) << ", " << (void*)front(verificationResults.localMatches).matches.get() << std::endl;
227 appendValue(verificationResults.localMatches, results);
228 // SEQAN_OMP_PRAGMA(critical)
229 // std::cerr << "AFTER: " << &verificationResults << " length(*front(verificationResults.localMatches).matches) == " << length(*front(verificationResults.localMatches).matches) << ", " << (void*)front(verificationResults.localMatches).matches.get() << std::endl;
230 omp_unset_lock(&verificationResults.lock->lock_);
231 }
232
233 // Uses the read ID to find the correct SWIFT pattern of the TLS, and
234 // the correct local ID within this SWIFT pattern to update the max
235 // errors.
236 //
237 // We do not disable the read right
238 template <typename TSpec, typename TReadNo, typename TMaxErrors>
239 inline void
setMaxErrors(ThreadLocalStorage<TSpec> & tls,TReadNo readNo,TMaxErrors maxErrors)240 setMaxErrors(ThreadLocalStorage<TSpec> & tls, TReadNo readNo, TMaxErrors maxErrors)
241 {
242 int localReadNo = readNo - tls.splitters[tls.threadId];
243 setMaxErrors(tls.filterPattern, localReadNo, maxErrors);
244 }
245
246 template <
247 typename TMatches,
248 typename TFragmentStore,
249 typename TFilterFinder,
250 typename TFilterPattern,
251 typename TShape /*TODO(holtgrew): Superflous.*/,
252 typename TOptions,
253 typename TCounts,
254 typename TRazerSMode,
255 typename THitString>
workVerification(ThreadLocalStorage<MapSingleReads<TMatches,TFragmentStore,TFilterFinder,TFilterPattern,TShape,TOptions,TCounts,TRazerSMode>> & tls,Job<SingleVerification<TMatches,TFragmentStore,THitString,TOptions,TFilterPattern>> & job,String<unsigned> const & splitters)256 void workVerification(ThreadLocalStorage<MapSingleReads<TMatches, TFragmentStore, TFilterFinder, TFilterPattern, TShape, TOptions, TCounts, TRazerSMode> > & tls,
257 Job<SingleVerification<TMatches, TFragmentStore, THitString, TOptions, TFilterPattern> > & job,
258 String<unsigned> const & splitters)
259 {
260 #ifdef RAZERS_PROFILE
261 timelineBeginTask(TASK_VERIFY);
262 #endif // #ifdef RAZERS_PROFILE
263 double start = sysTime();
264
265 typedef typename Iterator<THitString, Standard>::Type THitStringIterator;
266 typedef typename TFragmentStore::TContigSeq TContigSeq;
267
268 TContigSeq & contigSeq = job.globalStore->contigStore[job.contigId].seq;
269 #ifdef RAZERS_BANDED_MYERS
270 int64_t contigLength = length(contigSeq);
271 #endif
272
273 std::shared_ptr<TMatches> localMatches(new TMatches());
274 resize(*localMatches, 1);
275 clear(*localMatches);
276
277 // Initialize verifier.
278 tls.verifier.matches = localMatches.get();
279 tls.verifier.options = job.options;
280 tls.verifier.filterPattern = job.filterPattern;
281 tls.verifier.cnts = 0;
282
283 unsigned offset = splitters[job.threadId];
284 for (THitStringIterator it = iter(*job.hitsPtr, job.hitBegin), itEnd = iter(*job.hitsPtr, job.hitEnd); it != itEnd; ++it)
285 {
286 // if (length(swiftInfix(value(it), job.globalStore->contigStore[job.contigId].seq)) < length(tls.globalStore->readSeqStore[value(it).ndlSeqNo]))
287 // continue; // Skip if hit length < read length. TODO(holtgrew): David has to fix something in banded myers to make this work.
288
289 unsigned absReadId = offset + value(it).ndlSeqNo;
290 tls.verifier.m.readId = absReadId;
291
292 #ifdef RAZERS_BANDED_MYERS
293 tls.verifier.patternState.leftClip = (it->hstkPos >= 0) ? 0 : -it->hstkPos; // left clip if match begins left of the genome
294 tls.verifier.rightClip = (it->hstkPos + it->bucketWidth <= contigLength) ? 0 : it->hstkPos + it->bucketWidth - contigLength; // right clip if match end right of the genome
295 #endif
296 matchVerify(tls.verifier, swiftInfix(value(it), contigSeq), absReadId, tls.globalStore->readSeqStore[absReadId], TRazerSMode());
297 }
298
299 // SEQAN_OMP_PRAGMA(critical)
300 // {
301 // std::cerr << "thread " << omp_get_thread_num() << "; window " << job.windowNo << " hit group id " << job.hitGroupId << " thread id " << job.threadId << std::endl;
302 // std::cerr << "localMatches.data_begin == " << (void*)(localMatches->data_begin) << ", localMatches.data_end == " << (void*)(localMatches->data_end) << std::endl;
303 // std::cerr << "length(*localMatches) == " << length(*localMatches) << std::endl;
304 // }
305
306 appendToVerificationResults(*job.verificationResults, SingleVerificationResult<TMatches>(localMatches, job.hitGroupId, job.windowNo));
307 tls.options.timeVerification += sysTime() - start;
308
309 #ifdef RAZERS_PROFILE
310 timelineEndTask(TASK_VERIFY);
311 #endif // #ifdef RAZERS_PROFILE
312 }
313
314 template <
315 typename TMatches,
316 typename TFragmentStore,
317 typename TFilterFinder,
318 typename TFilterPattern,
319 typename TShape /*TODO(holtgrew): Superflous.*/,
320 typename TOptions,
321 typename TCounts,
322 typename TRazerSMode>
323 void
writeBackToLocal(ThreadLocalStorage<MapSingleReads<TMatches,TFragmentStore,TFilterFinder,TFilterPattern,TShape,TOptions,TCounts,TRazerSMode>> & tls,String<SingleVerificationResult<TMatches>> & verificationHits,bool dontCompact)324 writeBackToLocal(ThreadLocalStorage<MapSingleReads<TMatches, TFragmentStore, TFilterFinder, TFilterPattern, TShape, TOptions, TCounts, TRazerSMode> > & tls, String<SingleVerificationResult<TMatches> > & verificationHits, bool dontCompact)
325 {
326 #ifdef RAZERS_PROFILE
327 timelineBeginTask(TASK_WRITEBACK);
328 #endif // #ifdef RAZERS_PROFILE
329 if (tls.threadId == 0u && tls.options._debugLevel >= 3)
330 fprintf(stderr, "[writeback]");
331 typedef typename Size<typename TFragmentStore::TAlignedReadStore>::Type TAlignedReadStoreSize;
332
333 TAlignedReadStoreSize oldSize = length(tls.matches);
334 TAlignedReadStoreSize newSize = oldSize;
335
336 #ifdef RAZERS_DEFER_COMPACTION
337 (void) dontCompact; // unused
338
339 // (1) Write back verification results into bucket.
340 for (unsigned i = 0; i < length(verificationHits); ++i)
341 {
342 // verificationHits[i].windowNo;
343 // SEQAN_OMP_PRAGMA(critical)
344 // std::cerr << "thread " << omp_get_thread_num() << " got (" << verificationHits[i].windowNo << ", " << verificationHits[i].hitGroupId << ")" << std::endl;
345 tls.verificationResultBuckets[verificationHits[i].windowNo][verificationHits[i].hitGroupId] = verificationHits[i];
346 tls.missingInBucket[verificationHits[i].windowNo] -= 1;
347 // SEQAN_OMP_PRAGMA(critical)
348 // std::cerr << "thread " << omp_get_thread_num() << " {windowNo == " << verificationHits[i].windowNo << "--(" << tls.missingInBucket[verificationHits[i].windowNo] << ")}" << std::endl;
349 }
350 // SEQAN_OMP_PRAGMA(critical)
351 // std::cerr << "thread " << omp_get_thread_num() << " [wrote " << length(verificationHits) << " matches to buckets]" << std::flush;
352
353 unsigned const DELTA = getMaxDeviationOfOrder(tls.filterPattern);
354 // std::cerr << "(DELTA=" << DELTA << ")";
355 //std::cerr << "[DELTA=" << DELTA << std::endl;
356 size_t firstBeginPos = std::numeric_limits<size_t>::max(); // Leftmost sort position, required later for masking.
357 size_t firstWindowBegin = std::numeric_limits<size_t>::max(); // Leftmost sort position, required later for masking.
358 unsigned bucketsWrittenBack = 0;
359 // (2) Write back the longest contiguous sequence of full buckets.
360 for (; tls.nextBucketToWriteBack < length(tls.missingInBucket) && tls.missingInBucket[tls.nextBucketToWriteBack] == 0u; ++tls.nextBucketToWriteBack, ++bucketsWrittenBack)
361 {
362 // std::cerr << "(((WRITING BACK BUCKET " << tls.nextBucketToWriteBack << ")))" << std::endl;
363 // (2 a) Compute new size, reserve memory, copy data.
364 size_t originalSize = length(tls.matches);
365 unsigned idx = tls.nextBucketToWriteBack;
366 // SEQAN_OMP_PRAGMA(critical)
367 // std::cerr << "\n";
368 for (unsigned i = 0; i < length(tls.verificationResultBuckets[idx]); ++i)
369 {
370 // SEQAN_OMP_PRAGMA(critical)
371 // {
372 // std::cerr << "thread " << omp_get_thread_num() << " accessing (" << idx << ", " << i << ")" << std::endl;
373 // // unsigned len = length(*tls.verificationResultBuckets[idx][i].matches);
374 // std::cerr << "thread " << omp_get_thread_num() << " len=" << length(*tls.verificationResultBuckets[idx][i].matches) << "|" << std::endl;
375 // std::cerr << "thread " << omp_get_thread_num() << " newSize=" << newSize << "|" << std::endl;
376 // std::cerr << "thread " << omp_get_thread_num() << " matches.data_begin == " << (void*)(tls.verificationResultBuckets[idx][i].matches->data_begin) << ", matches.data_end == " << (void*)(tls.verificationResultBuckets[idx][i].matches->data_end) << std::endl;
377 // }
378 SEQAN_ASSERT_NEQ(tls.verificationResultBuckets[idx][i].matches.get(), static_cast<TMatches *>(0));
379 newSize += length(*tls.verificationResultBuckets[idx][i].matches);
380 }
381 reserve(tls.matches, newSize);
382 for (unsigned i = 0; i < length(tls.verificationResultBuckets[idx]); ++i)
383 {
384 if (!empty(*tls.verificationResultBuckets[idx][i].matches))
385 // std::cerr << "BUCKET FROM WINDOW " << tls.nextBucketToWriteBack << "\t" << front(*tls.verificationResultBuckets[idx][i].matches).beginPos << "\t" << back(*tls.verificationResultBuckets[idx][i].matches).endPos << "\t" << length(*tls.verificationResultBuckets[idx][i].matches) << std::endl;
386 append(tls.matches, *tls.verificationResultBuckets[idx][i].matches);
387 }
388
389 // (2 b) Get begin position.
390 size_t beginPos = originalSize;
391 // std::cerr << "originalSize = " << originalSize << std::endl;
392 if (beginPos > 0u)
393 beginPos -= 1;
394 size_t dPos = 1;
395 // Exponential search backwards. After masking, reads are sorted by begin position.
396 size_t windowBegin = tls.options.windowSize * idx;
397 if (firstWindowBegin == std::numeric_limits<size_t>::max())
398 firstWindowBegin = windowBegin;
399 while (beginPos > 0u &&
400 static_cast<size_t>(tls.matches[beginPos].beginPos) < windowBegin &&
401 static_cast<size_t>(tls.matches[beginPos].beginPos + 10 * DELTA) > windowBegin)
402 {
403 if (beginPos > dPos)
404 beginPos -= dPos;
405 else
406 beginPos = 0;
407 dPos *= 2;
408 }
409 // // Binary search forwards.
410 // typedef typename Iterator<TMatches>::Type TIterator;
411 // typedef typename Value<TMatches>::Type TMatch;
412 // TMatch m;
413 // m.beginPos = windowBegin;
414 // LessBeginPos<TMatch> cmp;
415 // TIterator it = std::lower_bound(begin(tls.matches, Standard()) + beginPos, end(tls.matches, Standard()), m, cmp);
416 // beginPos = it - begin(tls.matches, Standard());
417 if (firstBeginPos == std::numeric_limits<size_t>::max())
418 firstBeginPos = beginPos;
419
420 // SEQAN_OMP_PRAGMA(critical)
421 // if (length(tls.matches) > 0u)
422 // std::cerr << "((MASKING FROM " << tls.matches[beginPos].beginPos << " TO " << windowBegin + tls.options.windowSize << " ~ " << back(tls.matches).beginPos << "))" << std::endl
423 // << " ->> " << length(tls.matches) - beginPos << " of " << length(tls.matches) << " elements , beginPos == " << beginPos << std::endl;
424 // SEQAN_OMP_PRAGMA(critical)
425 // if (length(tls.matches) > 0u)
426 // std::cerr << "((MASKING FROM " << tls.matches[beginPos].beginPos << " TO " << windowBegin + tls.options.windowSize << "))" << std::endl;
427 // SEQAN_OMP_PRAGMA(critical)
428 // std::cerr << "thread " << omp_get_thread_num() << " (masking from " << beginPos << " to end=" << length(tls.matches) << ")" << std::endl;
429 // Do not mask duplicates if not in edit distance mode and not using pigeonhole filter
430 if (!IsSameType<typename TRazerSMode::TGapMode, RazerSGapped>::VALUE && tls.options.threshold != 0)
431 continue;
432
433 // (2 c) Mask duplicates from beginPos to the end position.
434 maskDuplicates(tls.matches, begin(tls.matches, Standard()) + beginPos, end(tls.matches, Standard()), tls.options, TRazerSMode());
435 }
436
437 // std::cerr << "[wrote back " << bucketsWrittenBack << " buckets (" << tls.nextBucketToWriteBack << "/" << length(tls.missingInBucket) << ")]" << std::endl;
438 if (bucketsWrittenBack > 0u)
439 {
440 // (3) Update match filter data structure for disabling reads.
441 size_t nextWindowBegin = tls.options.windowSize * (tls.nextBucketToWriteBack);
442 if (tls.nextBucketToWriteBack == length(tls.missingInBucket)) // is last?
443 nextWindowBegin += DELTA;
444 // std::cerr << "((REGISTERING/PROCESSING FROM " << firstWindowBegin << " TO " << nextWindowBegin << "))" << std::endl;
445 typedef typename Iterator<TMatches>::Type TIterator;
446 TIterator itBegin = begin(tls.matches, Standard()) + firstBeginPos;
447 TIterator itEnd = end(tls.matches, Standard());
448 TIterator it = itBegin;
449 // SEQAN_ASSERT_LT(itBegin->beginPos, nextWindowBegin);
450 for (; it != itEnd && static_cast<size_t>(it->beginPos + DELTA) <= nextWindowBegin; ++it)
451 {
452 if (it->orientation == '-')
453 continue; // Skip masked reads.
454 if (it->isRegistered)
455 continue;
456 it->isRegistered = true;
457 registerRead(*tls.matchFilter, it->readId, it->score);
458 }
459 itEnd = it;
460 it = itBegin;
461 unsigned disabled = 0;
462 for (; it != itEnd; ++it)
463 {
464 if (it->orientation == '-')
465 continue; // Skip masked reads.
466 disabled += processRead(*tls.matchFilter, it->readId);
467 }
468 if (tls.options._debugLevel >= 2 && disabled > 0)
469 fprintf(stderr, " [%u reads disabled]", disabled);
470 }
471 #else // #ifdef RAZERS_DEFER_COMPACTION
472 for (unsigned i = 0; i < length(verificationHits); ++i)
473 newSize += length(*verificationHits[i].matches);
474
475 reserve(tls.matches, newSize);
476
477 // Write back all matches from verification to the block local store.
478 for (unsigned i = 0; i < length(verificationHits); ++i)
479 append(tls.matches, *verificationHits[i].matches);
480
481 // Possibly compact matches.
482 if (!dontCompact && length(tls.matches) > tls.options.compactThresh)
483 {
484 #ifdef RAZERS_PROFILE
485 timelineBeginTask(TASK_COMPACT);
486 #endif // #ifdef RAZERS_PROFILE
487 typedef typename TFragmentStore::TAlignedReadStore TAlignedReadStore;
488 typename Size<TAlignedReadStore>::Type oldSize = length(tls.matches);
489
490 // if (tls.threadId == 0u && tls.options._debugLevel >= 3)
491 // fprintf(stderr, "[compact]");
492 if (IsSameType<typename TRazerSMode::TGapMode, RazerSGapped>::VALUE || tls.options.threshold == 0)
493 maskDuplicates(tls.matches, tls.options, TRazerSMode()); // overlapping parallelograms cause duplicates
494
495 compactMatches(tls.matches, tls.counts, tls.options, TRazerSMode(), tls, COMPACT);
496
497 if (length(tls.matches) * 4 > oldSize) // the threshold should not be raised if too many matches were removed
498 {
499 while (tls.options.compactThresh < oldSize)
500 tls.options.compactThresh *= tls.options.compactMult;
501 // tls.options.compactThresh += (tls.options.compactThresh >> 1); // if too many matches were removed
502 if (tls.threadId == 0u && tls.options._debugLevel >= 3)
503 fprintf(stderr, "[raising threshold to %u]", unsigned(tls.options.compactThresh));
504 }
505 #ifdef RAZERS_PROFILE
506 timelineEndTask(TASK_COMPACT);
507 #endif // #ifdef RAZERS_PROFILE
508 }
509 #endif // #ifdef RAZERS_DEFER_COMPACTION
510
511 #ifdef RAZERS_PROFILE
512 timelineEndTask(TASK_WRITEBACK);
513 #endif // #ifdef RAZERS_PROFILE
514 }
515
516 template <typename TMatches>
clearLocalMatches(String<TMatches * > & localMatches)517 void clearLocalMatches(String<TMatches *> & localMatches)
518 {
519 for (unsigned i = 0; i < length(localMatches); ++i)
520 delete localMatches[i];
521 clear(localMatches);
522 }
523
524 template <typename TMatches>
clearLocalMatches(String<SingleVerificationResult<TMatches>> & localMatches)525 void clearLocalMatches(String<SingleVerificationResult<TMatches> > & localMatches)
526 {
527 clear(localMatches);
528 }
529
530 // Find read matches in one genome sequence.
531 //
532 // The parallelization is simple. We perform the filtering window-wise.
533 // Then, we generate verification jobs from the swift hits. The SWIFT hits
534 // are distributed by a simple static load balancing of adjacent hits. These
535 // are then processed by all "leading" threads, where leading threads are
536 // those with the largest number of processed windows.
537 template <
538 typename TFSSpec,
539 typename TFSConfig,
540 typename TThreadLocalStorages,
541 typename TContigId,
542 typename TCounts,
543 typename TSpec,
544 typename TShape,
545 typename TAlignMode,
546 typename TGapMode,
547 typename TScoreMode,
548 typename TMatchNPolicy,
549 typename TFilterSpec>
_mapSingleReadsParallelToContig(FragmentStore<TFSSpec,TFSConfig> & store,TThreadLocalStorages & threadLocalStorages,String<unsigned> const & splitters,TContigId const & contigId,TCounts &,char orientation,RazerSCoreOptions<TSpec> & options,TShape const &,RazerSMode<TAlignMode,TGapMode,TScoreMode,TMatchNPolicy> const &,TFilterSpec)550 void _mapSingleReadsParallelToContig(
551 FragmentStore<TFSSpec, TFSConfig> & store,
552 TThreadLocalStorages & threadLocalStorages,
553 String<unsigned> const & splitters,
554 TContigId const & contigId,
555 TCounts & /*cnts*/,
556 char orientation,
557 RazerSCoreOptions<TSpec> & options,
558 TShape const & /*shape*/,
559 RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> const & /*mode*/,
560 TFilterSpec)
561 {
562 #ifdef RAZERS_PROFILE
563 timelineBeginTask(TASK_ON_CONTIG);
564 #endif // #ifdef RAZERS_PROFILE
565 typedef FragmentStore<TFSSpec, TFSConfig> TFragmentStore;
566 typedef typename TFragmentStore::TContigSeq TContigSeq;
567 typedef typename TFragmentStore::TReadSeqStore TReadSeqStore;
568 typedef typename Value<TReadSeqStore>::Type const TRead;
569 typedef StringSet<TRead> TReadSet;
570 typedef Index<TReadSet, IndexQGram<TShape, OpenAddressing> > TIndex; // q-gram index
571 //typedef typename Size<TReadSeqStore>::Type TSize;
572
573 //typedef RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> TRazerSMode;
574 typedef RazerSCoreOptions<TSpec> TOptions;
575
576 typedef typename Value<TThreadLocalStorages>::Type TThreadLocalStorage;
577 typedef typename TThreadLocalStorage::TMatches TMatches;
578
579 // TODO(holtgrew): What about cnts, mode?
580
581 typedef Finder<TContigSeq, TFilterSpec> TFilterFinder;
582 typedef Pattern<TIndex, TFilterSpec> TFilterPattern;
583
584 typedef RazerSCoreOptions<TSpec> TOptions;
585
586 typedef typename WindowFindResult<TFilterFinder>::Type THitString;
587 typedef Job<SingleVerification<TMatches, TFragmentStore, THitString, TOptions, TFilterPattern> > TVerificationJob;
588
589 // Debug output...
590 if (options._debugLevel >= 1)
591 {
592 std::cerr << std::endl << "Process genome seq #" << contigId;
593 if (orientation == 'F')
594 std::cerr << "[fwd]";
595 else
596 std::cerr << "[rev]";
597 }
598
599 // -----------------------------------------------------------------------
600 // Reverse-complement contig if necessary.
601 // -----------------------------------------------------------------------
602 TContigSeq & contigSeq = store.contigStore[contigId].seq;
603 #ifdef RAZERS_PROFILE
604 timelineBeginTask(TASK_REVCOMP);
605 #endif // #ifdef RAZERS_PROFILE
606 if (orientation == 'R')
607 reverseComplement(contigSeq);
608 #ifdef RAZERS_PROFILE
609 timelineEndTask(TASK_REVCOMP);
610 #endif // #ifdef RAZERS_PROFILE
611
612 // -----------------------------------------------------------------------
613 // Per-contig initialization of thread local storage objects.
614 // -----------------------------------------------------------------------
615 // TODO(holtgrew): Maybe put into its own function?
616 for (unsigned i = 0; i < options.threadCount; ++i)
617 {
618 // Initialize verifier object.
619 threadLocalStorages[i].verifier.onReverseComplement = (orientation == 'R');
620 threadLocalStorages[i].verifier.genomeLength = length(contigSeq);
621 threadLocalStorages[i].verifier.oneMatchPerBucket = false;
622 threadLocalStorages[i].verifier.m.contigId = contigId;
623 }
624
625 // -----------------------------------------------------------------------
626 // Perform filtration.
627 // -----------------------------------------------------------------------
628 TaskQueue<TVerificationJob, OmpLock> taskQueue;
629 volatile unsigned leaderWindowsDone = 0; // Number of windows done in leaders.
630 volatile unsigned threadsFiltering = options.threadCount;
631
632 // We will create the swift finder for thread 0 first. This will trigger parallel repeat finding in the SWIFT
633 // finder construction. Then, we copy over the finder to all threads.
634 #ifdef RAZERS_PROFILE
635 timelineBeginTask(TASK_COPY_FINDER);
636 #endif // #ifdef RAZERS_PROFILE
637
638 threadLocalStorages[0].filterFinder = TFilterFinder(store.contigStore[contigId].seq, threadLocalStorages[0].options.repeatLength, 1);
639
640 #ifdef RAZERS_PROFILE
641 timelineEndTask(TASK_COPY_FINDER);
642 #endif // #ifdef RAZERS_PROFILE
643
644 SEQAN_OMP_PRAGMA(parallel)
645 {
646 unsigned windowsDone = 0;
647
648 // Initialization.
649 TThreadLocalStorage & tls = threadLocalStorages[omp_get_thread_num()];
650 #ifdef RAZERS_PROFILE
651 timelineBeginTask(TASK_COPY_FINDER);
652 #endif // #ifdef RAZERS_PROFILE
653 // tls.filterFinder = TFilterFinder(store.contigStore[contigId].seq, tls.options.repeatLength, 1);
654 if (omp_get_thread_num() != 0)
655 tls.filterFinder = threadLocalStorages[0].filterFinder;
656
657 // wait until everyone copied the filter of thread 0 before it is changed
658 SEQAN_OMP_PRAGMA(barrier)
659
660 #ifdef RAZERS_PROFILE
661 timelineEndTask(TASK_COPY_FINDER);
662 timelineBeginTask(TASK_FILTER);
663 #endif // #ifdef RAZERS_PROFILE
664
665 if (!windowFindBegin(tls.filterFinder, tls.filterPattern, tls.options.errorRate))
666 std::cerr << "ERROR: windowFindBegin() failed in thread " << tls.threadId << std::endl;
667
668 #ifdef RAZERS_PROFILE
669 timelineEndTask(TASK_FILTER);
670 #endif // #ifdef RAZERS_PROFILE
671
672 // Pre-allocate buckets.
673 tls.nextBucketToWriteBack = 0;
674 resize(tls.verificationResultBuckets, unsigned(ceil(1.0 * (length(contigSeq) + 1000) / options.windowSize))); // +1000 for the case where contig seq length is multiple of window size
675 // SEQAN_OMP_PRAGMA(critical)
676 // std::cerr << "window count: " << length(tls.verificationResultBuckets) << std::endl;
677 clear(tls.missingInBucket);
678 resize(tls.missingInBucket, length(tls.verificationResultBuckets), std::numeric_limits<unsigned>::max());
679
680 // For each filtration window...
681 bool hasMore = !empty(host(tls.filterFinder));
682 while (hasMore)
683 {
684 #ifdef RAZERS_PROFILE
685 timelineBeginTask(TASK_FILTER);
686 #endif // #ifdef RAZERS_PROFILE
687 double filterStart = sysTime();
688 // fprintf(stderr, "[filter]");
689 hasMore = windowFindNext(tls.filterFinder, tls.filterPattern, tls.options.windowSize);
690 // std::cerr << "FILTERING WINDOW " << windowsDone << std::endl << "\t" << tls.options.windowSize;
691
692 windowsDone += 1; // Local windows done count.
693 atomicMax(leaderWindowsDone, windowsDone);
694
695 std::shared_ptr<THitString> hitsPtr(new THitString()); //TODO (weese:) Could we reuse memory here?
696 resize(*hitsPtr, 1);
697 clear(*hitsPtr);
698 using std::swap;
699 swap(*hitsPtr, getWindowFindHits(tls.filterFinder));
700 THitString & hits = *hitsPtr;
701 tls.options.countFiltration += length(hits);
702 // std::cerr << " HITS: " << length(hits) << std::endl;
703 // if (length(hits) > 0u)
704 // std::cerr << " RANGE " << front(hits).hstkPos << "\t" << back(hits).bucketWidth << std::endl;
705
706 // Enqueue verification jobs.
707 if (length(hits) > 0u)
708 {
709 // Compute splitters, given a verification package size and a
710 // bound on the package count.
711 String<unsigned> splitters;
712 unsigned packageCount = tls.options.maxVerificationPackageCount * omp_get_max_threads();
713 computeSplittersBySlotSize(splitters, length(hits), tls.options.verificationPackageSize, packageCount);
714
715 // Push verification jobs to the job queue.
716 String<TVerificationJob> jobs;
717 reserve(jobs, length(splitters) - 1);
718 for (unsigned i = 1; i < length(splitters); ++i)
719 {
720 // SEQAN_OMP_PRAGMA(critical)
721 // std::cerr << "\n";
722 appendValue(jobs, TVerificationJob(tls.threadId, tls.verificationResults, store, contigId, windowsDone - 1, hitsPtr, i - 1, splitters[i - 1], splitters[i], *tls.globalOptions, tls.filterPattern));
723 // SEQAN_OMP_PRAGMA(critical)
724 // std::cerr << "new job(" << tls.threadId << ", tls.verificationResults, store, " << contigId << ", " << windowsDone - 1 << ", hitsPtr, " << i - 1 << ", " << splitters[i - 1] << ", " << splitters[i] << ", *tls.globalOptions, tls.filterPattern)" << std::endl;
725 }
726 pushFront(taskQueue, jobs);
727
728 // Preallocate space in bucket and initialize "to do" counter.
729 clear(tls.verificationResultBuckets[windowsDone - 1]);
730 resize(tls.verificationResultBuckets[windowsDone - 1], length(splitters) - 1);
731 tls.missingInBucket[windowsDone - 1] = length(splitters) - 1;
732 // for (unsigned i = 0; i < length(splitters) - 1; ++i)
733 // tls.missingInBucket[windowsDone - 1] -= splitters[i] == splitters[i + 1];
734 }
735 else
736 {
737 tls.missingInBucket[windowsDone - 1] = 0;
738 }
739 tls.options.timeFiltration += sysTime() - filterStart;
740 #ifdef RAZERS_PROFILE
741 timelineEndTask(TASK_FILTER);
742 #endif // #ifdef RAZERS_PROFILE
743
744 // Perform verification as long as we are a leader and there are filtration jobs to perform.
745 while (leaderWindowsDone == windowsDone)
746 {
747 TVerificationJob job;
748 if (!popFront(job, taskQueue))
749 break;
750 // fprintf(stderr, "[verify]");
751 workVerification(tls, job, splitters);
752 }
753
754 // Write back verification results for this thread so far.
755 //
756 // First, swap out the current set of local stores from the verification results.
757 omp_set_lock(&tls.verificationResults.lock->lock_);
758 String<SingleVerificationResult<TMatches> > localMatches;
759 // SEQAN_OMP_PRAGMA(critical)
760 // std::cerr << "thread " << omp_get_thread_num() << " SWAPPING " << &tls.verificationResults.localMatches << "\n";
761 swap(localMatches, tls.verificationResults.localMatches);
762 omp_unset_lock(&tls.verificationResults.lock->lock_);
763 // Don't compact matches if in configured 'block fraction' of genome.
764 size_t hstckLen = tls.filterFinder.endPos - tls.filterFinder.startPos;
765 size_t hstckLeft = tls.filterFinder.endPos - tls.filterFinder.curPos;
766 double fracTodo = 1.0 * hstckLeft / hstckLen;
767 bool dontCompact = tls.options.noCompactFrac >= fracTodo;
768 // Write back the contents of these stores to the thread-local store.
769 writeBackToLocal(tls, localMatches, dontCompact);
770 // #ifndef RAZERS_DEFER_COMPACTION
771 clearLocalMatches(localMatches);
772 // #endif // #ifndef RAZERS_DEFER_COMPACTION
773 }
774
775 // Finalization
776 windowFindEnd(tls.filterFinder, tls.filterPattern);
777
778 SEQAN_OMP_PRAGMA(atomic)
779 threadsFiltering -= 1;
780
781 // Continue to try to help verify.
782 while (threadsFiltering > 0u)
783 {
784 TVerificationJob job;
785 if (popFront(job, taskQueue))
786 workVerification(tls, job, splitters);
787 }
788
789 // After every thread is done with everything, write back once more.
790 SEQAN_OMP_PRAGMA(barrier)
791 writeBackToLocal(tls, tls.verificationResults.localMatches, true);
792 // #ifndef RAZERS_DEFER_COMPACTION
793 clearLocalMatches(tls.verificationResults.localMatches);
794 // #endif // #ifndef RAZERS_DEFER_COMPACTION
795 // std::cerr << "AT END OF CONTIG #alignments " << length(tls.matches) << std::endl;
796 }
797
798
799 // NOTE:
800 // We never undo the reverse-complementing!
801 // It is not necessary as the contigs are freed by unlockAndFreeContig
802 // They are loaded again in dumpMatches if necessary
803 //
804 // if (!unlockAndFreeContig(store, contigId)) // if the contig is still used
805 // if (orientation == 'R') reverseComplement(contigSeq); // we have to restore original orientation
806 #ifdef RAZERS_PROFILE
807 timelineEndTask(TASK_ON_CONTIG);
808 #endif // #ifdef RAZERS_PROFILE
809 }
810
811 // Global initialization of block local storages.
812 template <typename TThreadLocalStorages, typename TFragmentStore, typename TSplitters, typename TShape, typename TOptions>
initializeThreadLocalStoragesSingle(TThreadLocalStorages & threadLocalStorages,TFragmentStore & store,TSplitters const & splitters,TShape & shape,TOptions & options)813 void initializeThreadLocalStoragesSingle(TThreadLocalStorages & threadLocalStorages,
814 TFragmentStore /*const*/ & store,
815 TSplitters const & splitters,
816 TShape /*const*/ & shape,
817 TOptions /*const*/ & options)
818 {
819 SEQAN_ASSERT_GT(length(splitters), 1u);
820 int threadCount = length(splitters) - 1;
821
822 typedef typename Value<TThreadLocalStorages>::Type TThreadLocalStorage;
823 typedef typename TThreadLocalStorage::TFilterPattern TFilterPattern;
824 typedef typename Host<TFilterPattern>::Type TIndex;
825 typedef typename Position<typename TFragmentStore::TContigStore>::Type TPosition;
826
827 resize(threadLocalStorages, threadCount);
828 SEQAN_OMP_PRAGMA(parallel for schedule(static, 1))
829 for (int i = 0; i < threadCount; ++i)
830 {
831 TThreadLocalStorage & tls = threadLocalStorages[i];
832
833 tls.threadId = i;
834 tls.globalStore = &store;
835 tls.shape = shape;
836 tls.options = options; // TODO(holtgrew): Copy for stats and threshold, really good?
837 tls.globalOptions = &options;
838 tls.splitters = splitters;
839
840 #ifdef RAZERS_DEFER_COMPACTION
841 typedef typename TThreadLocalStorage::TMatchFilter TMatchFilter;
842 double READ_FRAC_WITH_HISTO = 0.01;
843 tls.matchFilter.reset(new TMatchFilter(tls.splitters[tls.threadId + 1] - tls.splitters[tls.threadId], options.matchHistoStartThreshold, READ_FRAC_WITH_HISTO, tls, tls.splitters[tls.threadId], tls.globalStore->readSeqStore, tls.options));
844 tls.options.compactThresh = std::numeric_limits<unsigned>::max();
845 #endif // #ifdef RAZERS_DEFER_COMPACTION
846
847 // Clear pattern and set parameters.
848 TFilterPattern & filterPattern = tls.filterPattern;
849 clear(filterPattern);
850
851 // Initialize the index.
852 TIndex & index = host(tls.filterPattern);
853 clear(index);
854 // indexText(index).limitsValid = false;
855 // assign(indexText(index).strings, infix(store.readSeqStore, splitters[i], splitters[i + 1]), Exact());
856
857 clear(indexText(index));
858 for (TPosition j = splitters[i]; j < splitters[i + 1]; ++j)
859 appendValue(indexText(index), store.readSeqStore[j]);
860 //unsigned x = length(indexText(index));
861 //fprintf(stderr, "Index #%d has %u entries.\n", i, x);
862 index.shape = shape;
863
864 #ifdef RAZERS_OPENADDRESSING
865 index.alpha = options.loadFactor;
866 #endif
867 cargo(index).abundanceCut = options.abundanceCut;
868 cargo(index)._debugLevel = options._debugLevel;
869
870 // Configure filter pattern
871 // (if this is a pigeonhole filter, all sequences must be appended first)
872 _applyFilterOptions(filterPattern, options);
873
874 tls.filterPattern.params.printDots = (tls.threadId == 0) && (tls.options._debugLevel > 0);
875 }
876 }
877
878 // Write back from thread local storages to global store.
879 template <typename TFragmentStore,
880 typename TThreadLocalStorages>
881 void
writeBackToGlobalStore(TFragmentStore & target,TThreadLocalStorages & threadLocalStorages,bool isSingleEnd)882 writeBackToGlobalStore(
883 TFragmentStore & target,
884 TThreadLocalStorages /*const*/ & threadLocalStorages,
885 bool isSingleEnd) // begin/end already swapped for paired-end reads
886 {
887 typedef typename Size<typename TFragmentStore::TAlignedReadStore>::Type TAlignedReadStoreSize;
888 typedef typename Value<typename TFragmentStore::TAlignedReadStore>::Type TAlignedReadStoreElem;
889 typedef typename Value<typename TFragmentStore::TAlignQualityStore>::Type TAlignedQualStoreElem;
890 typedef typename Value<TThreadLocalStorages>::Type TThreadLocalStorage;
891 typedef typename TThreadLocalStorage::TMatches TMatches;
892 typedef typename Iterator<TMatches, Standard>::Type TMatchesIterator;
893
894 // Update the IDs and calculate new size so the prefix increment can be
895 // used in the loops.
896 TAlignedReadStoreSize oldSize = length(target.alignedReadStore);
897 TAlignedReadStoreSize newSize = oldSize;
898
899 for (unsigned i = 0; i < length(threadLocalStorages); ++i)
900 newSize += length(threadLocalStorages[i].matches);
901
902 // Resize first so copying happens at most once and not every for each
903 // block in the worst case
904 resize(target.alignedReadStore, newSize);
905 resize(target.alignQualityStore, newSize);
906
907 // Append single block stores.
908 // TODO(holtgrew): Do in parallel!
909 for (unsigned i = 0; i < length(threadLocalStorages); ++i)
910 {
911 TMatchesIterator it = begin(threadLocalStorages[i].matches, Standard());
912 TMatchesIterator itEnd = end(threadLocalStorages[i].matches, Standard());
913 for (; it != itEnd; ++it, ++oldSize)
914 {
915 using std::swap;
916 if (isSingleEnd && it->orientation == 'R')
917 swap(it->beginPos, it->endPos);
918 target.alignedReadStore[oldSize] = TAlignedReadStoreElem(oldSize, it->readId, it->contigId, it->beginPos, it->endPos);
919 if (!isSingleEnd)
920 SEQAN_ASSERT_NEQ(it->pairMatchId, Value<TMatches>::Type::INVALID_ID);
921 target.alignedReadStore[oldSize].pairMatchId = it->pairMatchId;
922 target.alignQualityStore[oldSize] = TAlignedQualStoreElem(it->pairScore, it->score, -it->score);
923 }
924 }
925 }
926
927 template <typename TFragmentStore,
928 typename TThreadLocalStorages>
929 void
writeBackToGlobalStore(TFragmentStore & target,TThreadLocalStorages & threadLocalStorages)930 writeBackToGlobalStore(
931 TFragmentStore & target,
932 TThreadLocalStorages /*const*/ & threadLocalStorages)
933 {
934 writeBackToGlobalStore(target, threadLocalStorages, true);
935 }
936
937 // Performs splitting of reads, initialization of OpenMP and the calls
938 // mapSingleReadsParallelToContig for each contig.
939 template <
940 typename TFSSpec,
941 typename TFSConfig,
942 typename TCounts,
943 typename TSpec,
944 typename TShape,
945 typename TAlignMode,
946 typename TGapMode,
947 typename TScoreMode,
948 typename TMatchNPolicy,
949 typename TFilterSpec>
_mapSingleReadsParallel(FragmentStore<TFSSpec,TFSConfig> & store,TCounts & cnts,RazerSCoreOptions<TSpec> & options,TShape const & shape,RazerSMode<TAlignMode,TGapMode,TScoreMode,TMatchNPolicy> const & mode,TFilterSpec)950 int _mapSingleReadsParallel(
951 FragmentStore<TFSSpec, TFSConfig> & store,
952 TCounts & cnts,
953 RazerSCoreOptions<TSpec> & options,
954 TShape const & shape,
955 RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> const & mode,
956 TFilterSpec)
957 {
958 typedef FragmentStore<TFSSpec, TFSConfig> TFragmentStore;
959 typedef typename TFragmentStore::TReadSeqStore TReadSeqStore;
960 typedef typename Value<TReadSeqStore>::Type const TRead;
961 typedef StringSet<TRead> TReadSet;
962 typedef Index<TReadSet, IndexQGram<TShape, OpenAddressing> > TIndex; // q-gram index
963 //typedef typename Size<TReadSeqStore>::Type TSize;
964
965 typedef Pattern<TIndex, TFilterSpec> TFilterPattern;
966 //typedef Pattern<TRead, MyersUkkonen> TMyersPattern; // verifier
967 typedef RazerSCoreOptions<TSpec> TOptions;
968
969 typedef RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> TRazerSMode;
970 typedef typename TFragmentStore::TContigSeq TContigSeq;
971 typedef Finder<TContigSeq, TFilterSpec> TFilterFinder;
972
973 typedef typename TFragmentStore::TContigSeq TContigSeq;
974 typedef typename Position<TContigSeq>::Type TContigPos;
975 typedef MatchRecord<TContigPos> TMatchRecord;
976 typedef String<TMatchRecord> TMatches;
977
978 // -----------------------------------------------------------------------
979 // Initialize global information.
980 // -----------------------------------------------------------------------
981
982 // Save OpenMP maximal thread count so we can restore it below, then set
983 // from options.
984 if (options._debugLevel >= 1)
985 std::cerr << std::endl << "Number of threads: \t" << options.threadCount << std::endl;
986 int oldMaxThreads = omp_get_max_threads();
987 omp_set_num_threads(options.threadCount);
988
989 #ifdef RAZERS_PROFILE
990 timelineBeginTask(TASK_INIT);
991 double beginInit = sysTime();
992 #endif // #ifdef RAZERS_PROFILE
993
994 // Clear/initialize global stats.
995 options.countFiltration = 0;
996 options.countVerification = 0;
997 options.timeMapReads = 0;
998 options.timeDumpResults = 0;
999
1000 // -----------------------------------------------------------------------
1001 // Initialize thread local storages.
1002 // -----------------------------------------------------------------------
1003 SEQAN_PROTIMESTART(initTime);
1004 String<unsigned> splitters;
1005 computeSplittersBySlotCount(splitters, length(store.readNameStore), options.threadCount);
1006 typedef ThreadLocalStorage<MapSingleReads<TMatches, TFragmentStore, TFilterFinder, TFilterPattern, TShape, TOptions, TCounts, TRazerSMode> > TThreadLocalStorage;
1007 String<TThreadLocalStorage> threadLocalStorages;
1008 initializeThreadLocalStoragesSingle(threadLocalStorages, store, splitters, shape, options);
1009
1010 #ifdef RAZERS_PROFILE
1011 double endInit = sysTime();
1012 std::cerr << "TIME initialization: " << (endInit - beginInit) << " s";
1013 timelineEndTask(TASK_INIT);
1014 #endif // #ifdef RAZERS_PROFILE
1015 double timeInitialization = SEQAN_PROTIMEDIFF(initTime);
1016 if (options._debugLevel >= 1)
1017 std::cerr << std::endl << "Initialization took \t" << timeInitialization << " seconds" << std::endl;
1018
1019 // -----------------------------------------------------------------------
1020 // Perform parallel mapping.
1021 // -----------------------------------------------------------------------
1022
1023 // Save compaction threshold and set global threshold to infinity, so matchVerify does not compact!
1024 int oldThreshold = options.compactThresh;
1025 options.compactThresh = std::numeric_limits<unsigned>::max();
1026
1027 // For each contig: Map reads in parallel.
1028 SEQAN_PROTIMESTART(findTime);
1029 for (unsigned contigId = 0; contigId < length(store.contigStore); ++contigId)
1030 {
1031 lockContig(store, contigId);
1032 if (options.forward)
1033 _mapSingleReadsParallelToContig(store, threadLocalStorages, splitters, contigId, cnts, 'F', options, shape, mode, TFilterSpec());
1034 if (options.reverse)
1035 _mapSingleReadsParallelToContig(store, threadLocalStorages, splitters, contigId, cnts, 'R', options, shape, mode, TFilterSpec());
1036 unlockAndFreeContig(store, contigId);
1037 }
1038 double endMapping = sysTime();
1039 #ifdef RAZERS_PROFILE
1040 std::cerr << std::endl << "TIME mapping: " << (endMapping - endInit) << " s" << std::endl;
1041 #endif // #ifdef RAZERS_PROFILE
1042
1043 #ifdef RAZERS_EXTERNAL_MATCHES
1044 // Compute whether to use slow, sequential sorting or parallel in-memory sorting.
1045 uint64_t totalMatchCount = 0;
1046 uint64_t maxMatchCount = 0;
1047 for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1048 {
1049 totalMatchCount += length(threadLocalStorages[i].matches);
1050 maxMatchCount = _max(maxMatchCount, length(threadLocalStorages[i].matches));
1051 }
1052 bool useExternalSort = false;
1053 bool useSequentialCompaction = false;
1054 if (options.availableMatchesMemorySize == -1)
1055 {
1056 useExternalSort = true;
1057 }
1058 else if (options.availableMatchesMemorySize != 0)
1059 {
1060 typedef typename Value<TMatches>::Type TMatch;
1061 int64_t totalMemoryRequired = sizeof(TMatch) * totalMatchCount;
1062 int64_t maxMemoryRequired = sizeof(TMatch) * maxMatchCount;
1063 if (options.availableMatchesMemorySize < totalMemoryRequired)
1064 {
1065 if (options.availableMatchesMemorySize < maxMemoryRequired)
1066 {
1067 useExternalSort = true;
1068 }
1069 else
1070 {
1071 useSequentialCompaction = true;
1072 }
1073 }
1074 }
1075
1076 // std::cerr << "useExternalSort == " << useExternalSort << "\n"
1077 // << "useSequentialCompaction == " << useSequentialCompaction << "\n";
1078
1079 // Switch between using parallel compaction, sequential compaction, and
1080 // sequential compaction with external sorting. The actual switch for the
1081 // sorting is in function compactMatches().
1082 if (useSequentialCompaction || useExternalSort)
1083 {
1084 for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1085 {
1086 // remove duplicates when mapping with gaps or using pigeonhole filter
1087 if (IsSameType<TGapMode, RazerSGapped>::VALUE || options.threshold == 0)
1088 maskDuplicates(threadLocalStorages[omp_get_thread_num()].matches, options, mode);
1089 Nothing nothing;
1090 CompactMatchesMode compactMode = useSequentialCompaction ? COMPACT_FINAL : COMPACT_FINAL_EXTERNAL;
1091 // std::cerr << "BEFORE FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1092 compactMatches(threadLocalStorages[omp_get_thread_num()].matches, cnts, options, mode, nothing, compactMode);
1093 // std::cerr << "AFTER FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1094 }
1095 }
1096 else
1097 {
1098 #endif // #ifdef RAZERS_EXTERNAL_MATCHES
1099 SEQAN_OMP_PRAGMA(parallel)
1100 {
1101 // TODO(holtgrew): We would really like to stop the additional masking step, the incremental masking SHOULD have taken care of this. Thus, the following should be ifndef and not ifdef.
1102 #ifndef RAZERS_DEFER_COMPACTION
1103 // remove duplicates when mapping with gaps or using pigeonhole filter
1104 if (IsSameType<TGapMode, RazerSGapped>::VALUE || options.threshold == 0)
1105 maskDuplicates(threadLocalStorages[omp_get_thread_num()].matches, options, mode);
1106 #endif // #ifndef RAZERS_DEFER_COMPACTION
1107 Nothing nothing;
1108 // std::cerr << "BEFORE FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1109 // SEQAN_OMP_PRAGMA(critical)
1110 // std::cerr << "BEFORE FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1111 compactMatches(threadLocalStorages[omp_get_thread_num()].matches, cnts, options, mode, nothing, COMPACT_FINAL);
1112 // std::cerr << "AFTER FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1113 // SEQAN_OMP_PRAGMA(critical)
1114 // std::cerr << "AFTER FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1115 }
1116 SEQAN_OMP_PRAGMA(barrier)
1117 #ifdef RAZERS_EXTERNAL_MATCHES
1118 }
1119
1120 #endif // #ifdef RAZERS_EXTERNAL_MATCHES
1121
1122 // Write back local stores to global stores.
1123 writeBackToGlobalStore(store, threadLocalStorages);
1124 // std::cerr << "length(threadLocalStorages[0].matches) == " << length(threadLocalStorages[0].matches) << std::endl;
1125 // std::cerr << "length(store.alignedReadStore) == " << length(store.alignedReadStore) << std::endl;
1126 double endWriteback = sysTime();
1127 options.timeFsCopy = endWriteback - endMapping;
1128
1129 // TODO(holtgrew): Sum up cnts?!
1130
1131 // -----------------------------------------------------------------------
1132 // Collect global statistics, cleanup.
1133 // -----------------------------------------------------------------------
1134
1135 // Restore old compaction threshold.
1136 options.compactThresh = oldThreshold;
1137
1138 // Add up thread-local filtration and verification counters and print totals.
1139 for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1140 {
1141 options.countFiltration += threadLocalStorages[i].options.countFiltration;
1142 options.countVerification += threadLocalStorages[i].options.countVerification;
1143 }
1144
1145 if (options._debugLevel >= 1)
1146 {
1147 for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1148 {
1149 std::cerr << "Thread #" << i << std::endl;
1150 std::cerr << " Masking duplicates took \t" << threadLocalStorages[i].options.timeMaskDuplicates << " seconds" << std::endl;
1151 std::cerr << " Compacting matches took \t" << threadLocalStorages[i].options.timeCompactMatches << " seconds" << std::endl;
1152 std::cerr << " Time for filtration \t" << threadLocalStorages[i].options.timeFiltration << " seconds" << std::endl;
1153 std::cerr << " Time for verifications \t" << threadLocalStorages[i].options.timeVerification << " seconds" << std::endl;
1154 }
1155 std::cerr << "Time for copying back \t" << options.timeFsCopy << " seconds" << std::endl;
1156 }
1157
1158 options.timeMapReads = SEQAN_PROTIMEDIFF(findTime);
1159 if (options._debugLevel >= 1)
1160 std::cerr << std::endl << "Finding reads took \t" << options.timeMapReads << " seconds" << std::endl;
1161 if (options._debugLevel >= 1)
1162 {
1163 std::cerr << std::endl;
1164 std::cerr << "___FILTRATION_STATS____" << std::endl;
1165 std::cerr << "Filtration counter: " << options.countFiltration << std::endl;
1166 std::cerr << "Successful verifications: " << options.countVerification << std::endl;
1167 }
1168
1169 // Restore global state.
1170 omp_set_num_threads(oldMaxThreads);
1171
1172 return 0;
1173 }
1174
1175 // Performs splitting of reads, initialization of OpenMP and the calls
1176 // mapSingleReadsParallelToContig for each contig.
1177 template <
1178 typename TFSSpec,
1179 typename TFSConfig,
1180 typename TCounts,
1181 typename TSpec,
1182 typename TShape,
1183 typename TAlignMode,
1184 typename TGapMode,
1185 typename TScoreMode,
1186 typename TMatchNPolicy>
_mapSingleReadsParallel(FragmentStore<TFSSpec,TFSConfig> & store,TCounts & cnts,RazerSCoreOptions<TSpec> & options,TShape const & shape,RazerSMode<TAlignMode,TGapMode,TScoreMode,TMatchNPolicy> const & mode)1187 int _mapSingleReadsParallel(
1188 FragmentStore<TFSSpec, TFSConfig> & store,
1189 TCounts & cnts,
1190 RazerSCoreOptions<TSpec> & options,
1191 TShape const & shape,
1192 RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> const & mode)
1193 {
1194 if (options.threshold > 0)
1195 {
1196 typedef typename If<IsSameType<TGapMode, RazerSGapped>, SwiftSemiGlobal, SwiftSemiGlobalHamming>::Type TSwiftSpec;
1197 return _mapSingleReadsParallel(store, cnts, options, shape, mode, Swift<TSwiftSpec>());
1198 }
1199 else
1200 {
1201 typedef typename If<IsSameType<TGapMode, RazerSGapped>, void, Hamming_>::Type TPigeonholeSpec;
1202 return _mapSingleReadsParallel(store, cnts, options, Shape<Dna, OneGappedShape>(), mode, Pigeonhole<TPigeonholeSpec>());
1203 }
1204 }
1205
1206 } // namespace seqan
1207
1208 #endif // #ifndef RAZERS_RAZERS_PARALLEL_H_
1209