1 #ifndef RAZERS_RAZERS_PARALLEL_H_
2 #define RAZERS_RAZERS_PARALLEL_H_
3 
4 // TODO(holtgrew): Ideally, we do not need any locks.
5 
6 #include "parallel_misc.h"
7 #include "parallel_job_queue.h"
8 #include "razers_match_filter.h"
9 
10 namespace seqan {
11 
12 // ===========================================================================
13 // Enums, Tags, Classes, Specializations
14 // ===========================================================================
15 
16 template <typename TSpec>
17 class Lock;
18 
19 struct Omp_;
20 typedef Tag<Omp_> Omp;
21 
22 template <>
23 class Lock<Omp>
24 {
25 public:
26     omp_lock_t lock_;
27 
Lock()28     Lock() { omp_init_lock(&lock_); }
29 
~Lock()30     ~Lock() { omp_destroy_lock(&lock_); }
31 };
32 
33 template <typename TMatches>
34 struct SingleVerificationResult
35 {
36     std::shared_ptr<TMatches> matches;
37     unsigned hitGroupId;
38     unsigned windowNo;
39 
SingleVerificationResultSingleVerificationResult40     SingleVerificationResult() :
41         hitGroupId(0), windowNo(0)
42     {}
43 
SingleVerificationResultSingleVerificationResult44     SingleVerificationResult(std::shared_ptr<TMatches> & matches_, unsigned hitGroupId_, unsigned windowNo_) :
45         matches(matches_), hitGroupId(hitGroupId_), windowNo(windowNo_)
46     {}
47 };
48 
49 // Stores the results of the verification.
50 //
51 // Put into its own class so it can be locked independently of other class
52 // members.
53 template <typename TMatches>
54 class SingleVerificationResults
55 {
56 public:
57     String<SingleVerificationResult<TMatches> > localMatches;
58     Lock<Omp> * lock;
59 
SingleVerificationResults()60     SingleVerificationResults() :
61         lock(new Lock<Omp>()) {}
62 
SingleVerificationResults(SingleVerificationResults const & other)63     SingleVerificationResults(SingleVerificationResults const & other) :
64         localMatches(other.localMatches), lock(new Lock<Omp>())
65     {
66         // Not thread-safe copying since this is only used at the beginning when resizing block local storages string.
67     }
68 
69     SingleVerificationResults & operator=(SingleVerificationResults const & other)
70     {
71         if (this == &other)
72             return *this;
73 
74         localMatches = other.localMatches;
75         return *this;
76     }
77 
~SingleVerificationResults()78     ~SingleVerificationResults()
79     {
80         delete lock;
81     }
82 
83 };
84 
85 template <
86     typename TMatches,
87     typename TFragmentStore,
88     typename TFilterFinder,
89     typename TFilterPattern,
90     typename TShape /*TODO(holtgrew): Superflous.*/,
91     typename TOptions,
92     typename TCounts,
93     typename TRazerSMode>
94 struct MapSingleReads {};
95 
96 template <typename TSpec>
97 class ThreadLocalStorage;
98 
99 // ThreadLocalStorage specialization for single-end read mapping in RazerS.
100 template <
101     typename TMatches_,
102     typename TFragmentStore,
103     typename TFilterFinder_,
104     typename TFilterPattern_,
105     typename TShape /*TODO(holtgrew): Superflous.*/,
106     typename TOptions,
107     typename TCounts,
108     typename TRazerSMode
109     >
110 class ThreadLocalStorage<
111     MapSingleReads<
112         TMatches_,
113         TFragmentStore,
114         TFilterFinder_,
115         TFilterPattern_,
116         TShape,
117         TOptions,
118         TCounts,
119         TRazerSMode> >
120 {
121 public:
122     typedef TFilterPattern_ TFilterPattern;
123     typedef TFilterFinder_ TFilterFinder;
124 
125     typedef TMatches_ TMatches;
126 #ifdef RAZERS_EXTERNAL_MATCHES
127     typedef TMatches TLargeMatches;
128 #else // #ifdef RAZERS_EXTERNAL_MATCHES
129     typedef typename Value<TMatches>::Type TMatchRecord;
130     typedef String<TMatchRecord, MMap<ExternalConfigLarge<> > > TLargeMatches;
131 #endif // #ifdef RAZERS_EXTERNAL_MATCHES
132 
133     // The id of this thread.
134     unsigned threadId;
135 
136     // Each thread needs its local options since the compactionThreshold is changed.
137     // TODO(holtgrew): Change overall program structure so this is factorized out of the options struct.
138     TOptions options;
139     TOptions /*const*/ * globalOptions;
140 
141     // Each thread has its own SWIFT finder and pattern object.
142     TFilterFinder filterFinder;
143     TFilterPattern filterPattern;
144 
145     TCounts counts;  // TODO(holtgrew): Artifact?
146 
147     TLargeMatches matches;  // TODO(holtgrew): However, not used in global store since global reads-to-reference alignment requires everything to be in memory
148     TFragmentStore /*const*/ * globalStore;
149 
150     TShape shape;
151 
152     String<String<SingleVerificationResult<TMatches> > > verificationResultBuckets;
153     String<unsigned> missingInBucket;
154     unsigned nextBucketToWriteBack;
155 
156     typedef MatchVerifier<TFragmentStore, TMatches, TOptions, TRazerSMode, TFilterPattern, TCounts> TMatchVerifier;
157     TMatchVerifier verifier;
158 
159     // Mailbox for the verification results.
160     SingleVerificationResults<TMatches> verificationResults;
161 
162     typedef MatchFilter<typename Spec<TOptions>::Type, typename TFragmentStore::TReadSeqStore, ThreadLocalStorage> TMatchFilter;
163     std::shared_ptr<TMatchFilter> matchFilter;
164 
165     String<unsigned> splitters;
166 
ThreadLocalStorage()167     ThreadLocalStorage() {}
168 };
169 
170 template <typename TSpec>
limitRead(ThreadLocalStorage<TSpec> & tls,unsigned readId,int newLimit)171 inline void limitRead(ThreadLocalStorage<TSpec> & tls, unsigned readId, int newLimit)
172 {
173     setMaxErrors(tls, readId, newLimit);
174 }
175 
176 template <typename TSpec>
disableRead(ThreadLocalStorage<TSpec> & tls,unsigned readId)177 inline void disableRead(ThreadLocalStorage<TSpec> & tls, unsigned readId)
178 {
179     setMaxErrors(tls, readId, -1);
180 }
181 
182 template <typename TMatches, typename TFragmentStore, typename THitString, typename TOptions, typename TFilterPattern>
183 struct SingleVerification;
184 
185 template <typename TMatches, typename TFragmentStore, typename THitString_, typename TOptions, typename TFilterPattern>
186 class Job<SingleVerification<TMatches, TFragmentStore, THitString_, TOptions, TFilterPattern> >
187 {
188 public:
189     typedef SingleVerificationResults<TMatches> TVerificationResults;
190     typedef THitString_ THitString;
191 
192     int threadId;
193     TVerificationResults * verificationResults;
194     TFragmentStore * globalStore;
195     unsigned contigId;
196     unsigned windowNo;
197     std::shared_ptr<THitString> hitsPtr;
198     unsigned hitGroupId;
199     unsigned hitBegin;
200     unsigned hitEnd;
201     TOptions * options;
202     TFilterPattern * filterPattern;
203 
Job()204     Job() {}
205 
Job(int threadId_,TVerificationResults & verificationResults_,TFragmentStore & globalStore_,unsigned contigId_,unsigned windowNo_,std::shared_ptr<THitString> & hitsPtr_,unsigned hitGroupId_,unsigned hitBegin_,unsigned hitEnd_,TOptions & options_,TFilterPattern & filterPattern_)206     Job(int threadId_, TVerificationResults & verificationResults_, TFragmentStore & globalStore_, unsigned contigId_, unsigned windowNo_, std::shared_ptr<THitString> & hitsPtr_, unsigned hitGroupId_, unsigned hitBegin_, unsigned hitEnd_, TOptions & options_, TFilterPattern & filterPattern_) :
207         threadId(threadId_), verificationResults(&verificationResults_), globalStore(&globalStore_), contigId(contigId_), windowNo(windowNo_), hitsPtr(hitsPtr_), hitGroupId(hitGroupId_), hitBegin(hitBegin_), hitEnd(hitEnd_), options(&options_), filterPattern(&filterPattern_)
208     {}
209 };
210 
211 // ===========================================================================
212 // Metafunctions
213 // ===========================================================================
214 
215 // ===========================================================================
216 // Functions
217 // ===========================================================================
218 
219 template <typename TMatches>
220 inline void
appendToVerificationResults(SingleVerificationResults<TMatches> & verificationResults,SingleVerificationResult<TMatches> const & results)221 appendToVerificationResults(SingleVerificationResults<TMatches> & verificationResults, SingleVerificationResult<TMatches> const & results)
222 {
223     omp_set_lock(&verificationResults.lock->lock_);
224 //     if (length(verificationResults.localMatches) > 0u)
225 // SEQAN_OMP_PRAGMA(critical)
226 //         std::cerr << "BEFORE: " << &verificationResults << " length(*front(verificationResults.localMatches).matches) == " << length(*front(verificationResults.localMatches).matches) << ", " << (void*)front(verificationResults.localMatches).matches.get() << std::endl;
227     appendValue(verificationResults.localMatches, results);
228 // SEQAN_OMP_PRAGMA(critical)
229 //     std::cerr << "AFTER:  " << &verificationResults << " length(*front(verificationResults.localMatches).matches) == " << length(*front(verificationResults.localMatches).matches) << ", " << (void*)front(verificationResults.localMatches).matches.get() << std::endl;
230     omp_unset_lock(&verificationResults.lock->lock_);
231 }
232 
233 // Uses the read ID to find the correct SWIFT pattern of the TLS, and
234 // the correct local ID within this SWIFT pattern to update the max
235 // errors.
236 //
237 // We do not disable the read right
238 template <typename TSpec, typename TReadNo, typename TMaxErrors>
239 inline void
setMaxErrors(ThreadLocalStorage<TSpec> & tls,TReadNo readNo,TMaxErrors maxErrors)240 setMaxErrors(ThreadLocalStorage<TSpec> & tls, TReadNo readNo, TMaxErrors maxErrors)
241 {
242     int localReadNo = readNo - tls.splitters[tls.threadId];
243     setMaxErrors(tls.filterPattern, localReadNo, maxErrors);
244 }
245 
246 template <
247     typename TMatches,
248     typename TFragmentStore,
249     typename TFilterFinder,
250     typename TFilterPattern,
251     typename TShape /*TODO(holtgrew): Superflous.*/,
252     typename TOptions,
253     typename TCounts,
254     typename TRazerSMode,
255     typename THitString>
workVerification(ThreadLocalStorage<MapSingleReads<TMatches,TFragmentStore,TFilterFinder,TFilterPattern,TShape,TOptions,TCounts,TRazerSMode>> & tls,Job<SingleVerification<TMatches,TFragmentStore,THitString,TOptions,TFilterPattern>> & job,String<unsigned> const & splitters)256 void workVerification(ThreadLocalStorage<MapSingleReads<TMatches, TFragmentStore, TFilterFinder, TFilterPattern, TShape, TOptions, TCounts, TRazerSMode> > & tls,
257                       Job<SingleVerification<TMatches, TFragmentStore, THitString, TOptions, TFilterPattern> > & job,
258                       String<unsigned> const & splitters)
259 {
260 #ifdef RAZERS_PROFILE
261     timelineBeginTask(TASK_VERIFY);
262 #endif  // #ifdef RAZERS_PROFILE
263     double start = sysTime();
264 
265     typedef typename Iterator<THitString, Standard>::Type THitStringIterator;
266     typedef typename TFragmentStore::TContigSeq TContigSeq;
267 
268     TContigSeq & contigSeq = job.globalStore->contigStore[job.contigId].seq;
269 #ifdef RAZERS_BANDED_MYERS
270     int64_t contigLength = length(contigSeq);
271 #endif
272 
273     std::shared_ptr<TMatches> localMatches(new TMatches());
274     resize(*localMatches, 1);
275     clear(*localMatches);
276 
277     // Initialize verifier.
278     tls.verifier.matches = localMatches.get();
279     tls.verifier.options = job.options;
280     tls.verifier.filterPattern = job.filterPattern;
281     tls.verifier.cnts = 0;
282 
283     unsigned offset = splitters[job.threadId];
284     for (THitStringIterator it = iter(*job.hitsPtr, job.hitBegin), itEnd = iter(*job.hitsPtr, job.hitEnd); it != itEnd; ++it)
285     {
286 //        if (length(swiftInfix(value(it), job.globalStore->contigStore[job.contigId].seq)) < length(tls.globalStore->readSeqStore[value(it).ndlSeqNo]))
287 //            continue;  // Skip if hit length < read length.  TODO(holtgrew): David has to fix something in banded myers to make this work.
288 
289         unsigned absReadId = offset + value(it).ndlSeqNo;
290         tls.verifier.m.readId = absReadId;
291 
292 #ifdef RAZERS_BANDED_MYERS
293         tls.verifier.patternState.leftClip = (it->hstkPos >= 0) ? 0 : -it->hstkPos;   // left clip if match begins left of the genome
294         tls.verifier.rightClip = (it->hstkPos + it->bucketWidth <= contigLength) ? 0 : it->hstkPos + it->bucketWidth - contigLength;  // right clip if match end right of the genome
295 #endif
296         matchVerify(tls.verifier, swiftInfix(value(it), contigSeq), absReadId, tls.globalStore->readSeqStore[absReadId], TRazerSMode());
297     }
298 
299 // SEQAN_OMP_PRAGMA(critical)
300 //     {
301 //         std::cerr << "thread " << omp_get_thread_num() << "; window " << job.windowNo << " hit group id " << job.hitGroupId << " thread id " << job.threadId << std::endl;
302 //         std::cerr << "localMatches.data_begin == " << (void*)(localMatches->data_begin) << ", localMatches.data_end == " << (void*)(localMatches->data_end) << std::endl;
303 //         std::cerr << "length(*localMatches) == " << length(*localMatches) << std::endl;
304 //     }
305 
306     appendToVerificationResults(*job.verificationResults, SingleVerificationResult<TMatches>(localMatches, job.hitGroupId, job.windowNo));
307     tls.options.timeVerification += sysTime() - start;
308 
309 #ifdef RAZERS_PROFILE
310     timelineEndTask(TASK_VERIFY);
311 #endif  // #ifdef RAZERS_PROFILE
312 }
313 
314 template <
315     typename TMatches,
316     typename TFragmentStore,
317     typename TFilterFinder,
318     typename TFilterPattern,
319     typename TShape /*TODO(holtgrew): Superflous.*/,
320     typename TOptions,
321     typename TCounts,
322     typename TRazerSMode>
323 void
writeBackToLocal(ThreadLocalStorage<MapSingleReads<TMatches,TFragmentStore,TFilterFinder,TFilterPattern,TShape,TOptions,TCounts,TRazerSMode>> & tls,String<SingleVerificationResult<TMatches>> & verificationHits,bool dontCompact)324 writeBackToLocal(ThreadLocalStorage<MapSingleReads<TMatches, TFragmentStore, TFilterFinder, TFilterPattern, TShape, TOptions, TCounts, TRazerSMode> > & tls, String<SingleVerificationResult<TMatches> > & verificationHits, bool dontCompact)
325 {
326 #ifdef RAZERS_PROFILE
327     timelineBeginTask(TASK_WRITEBACK);
328 #endif  // #ifdef RAZERS_PROFILE
329     if (tls.threadId == 0u && tls.options._debugLevel >= 3)
330         fprintf(stderr, "[writeback]");
331     typedef typename Size<typename TFragmentStore::TAlignedReadStore>::Type TAlignedReadStoreSize;
332 
333     TAlignedReadStoreSize oldSize = length(tls.matches);
334     TAlignedReadStoreSize newSize = oldSize;
335 
336 #ifdef RAZERS_DEFER_COMPACTION
337     (void) dontCompact;  // unused
338 
339     // (1) Write back verification results into bucket.
340     for (unsigned i = 0; i < length(verificationHits); ++i)
341     {
342         // verificationHits[i].windowNo;
343 // SEQAN_OMP_PRAGMA(critical)
344 //         std::cerr << "thread " << omp_get_thread_num() << " got (" << verificationHits[i].windowNo << ", " << verificationHits[i].hitGroupId << ")" << std::endl;
345         tls.verificationResultBuckets[verificationHits[i].windowNo][verificationHits[i].hitGroupId] = verificationHits[i];
346         tls.missingInBucket[verificationHits[i].windowNo] -= 1;
347 // SEQAN_OMP_PRAGMA(critical)
348 //         std::cerr << "thread " << omp_get_thread_num() << " {windowNo == " << verificationHits[i].windowNo << "--(" << tls.missingInBucket[verificationHits[i].windowNo] << ")}" << std::endl;
349     }
350 // SEQAN_OMP_PRAGMA(critical)
351 //     std::cerr << "thread " << omp_get_thread_num() << " [wrote " << length(verificationHits) << " matches to buckets]" << std::flush;
352 
353     unsigned const DELTA = getMaxDeviationOfOrder(tls.filterPattern);
354     // std::cerr << "(DELTA=" << DELTA << ")";
355     //std::cerr << "[DELTA=" << DELTA << std::endl;
356     size_t firstBeginPos = std::numeric_limits<size_t>::max();  // Leftmost sort position, required later for masking.
357     size_t firstWindowBegin = std::numeric_limits<size_t>::max();  // Leftmost sort position, required later for masking.
358     unsigned bucketsWrittenBack = 0;
359     // (2) Write back the longest contiguous sequence of full buckets.
360     for (; tls.nextBucketToWriteBack < length(tls.missingInBucket) && tls.missingInBucket[tls.nextBucketToWriteBack] == 0u; ++tls.nextBucketToWriteBack, ++bucketsWrittenBack)
361     {
362         // std::cerr << "(((WRITING BACK BUCKET " << tls.nextBucketToWriteBack << ")))" << std::endl;
363         // (2 a) Compute new size, reserve memory, copy data.
364         size_t originalSize = length(tls.matches);
365         unsigned idx = tls.nextBucketToWriteBack;
366 // SEQAN_OMP_PRAGMA(critical)
367 //         std::cerr << "\n";
368         for (unsigned i = 0; i < length(tls.verificationResultBuckets[idx]); ++i)
369         {
370 // SEQAN_OMP_PRAGMA(critical)
371 //             {
372 //             std::cerr << "thread " << omp_get_thread_num() << " accessing (" << idx << ", " << i << ")" << std::endl;
373 //             // unsigned len = length(*tls.verificationResultBuckets[idx][i].matches);
374 //             std::cerr << "thread " << omp_get_thread_num() << " len=" << length(*tls.verificationResultBuckets[idx][i].matches) << "|" << std::endl;
375 //             std::cerr << "thread " << omp_get_thread_num() << " newSize=" << newSize << "|" << std::endl;
376 //             std::cerr << "thread " << omp_get_thread_num() << " matches.data_begin == " << (void*)(tls.verificationResultBuckets[idx][i].matches->data_begin) << ", matches.data_end == " << (void*)(tls.verificationResultBuckets[idx][i].matches->data_end) << std::endl;
377 //             }
378             SEQAN_ASSERT_NEQ(tls.verificationResultBuckets[idx][i].matches.get(), static_cast<TMatches *>(0));
379             newSize += length(*tls.verificationResultBuckets[idx][i].matches);
380         }
381         reserve(tls.matches, newSize);
382         for (unsigned i = 0; i < length(tls.verificationResultBuckets[idx]); ++i)
383         {
384             if (!empty(*tls.verificationResultBuckets[idx][i].matches))
385                 // std::cerr << "BUCKET FROM WINDOW " << tls.nextBucketToWriteBack << "\t" << front(*tls.verificationResultBuckets[idx][i].matches).beginPos << "\t" << back(*tls.verificationResultBuckets[idx][i].matches).endPos << "\t" << length(*tls.verificationResultBuckets[idx][i].matches) << std::endl;
386                 append(tls.matches, *tls.verificationResultBuckets[idx][i].matches);
387         }
388 
389         // (2 b) Get begin position.
390         size_t beginPos = originalSize;
391         // std::cerr << "originalSize = " << originalSize << std::endl;
392         if (beginPos > 0u)
393             beginPos -= 1;
394         size_t dPos = 1;
395         // Exponential search backwards.  After masking, reads are sorted by begin position.
396         size_t windowBegin = tls.options.windowSize * idx;
397         if (firstWindowBegin == std::numeric_limits<size_t>::max())
398             firstWindowBegin = windowBegin;
399         while (beginPos > 0u &&
400                static_cast<size_t>(tls.matches[beginPos].beginPos) < windowBegin &&
401                static_cast<size_t>(tls.matches[beginPos].beginPos + 10 * DELTA) > windowBegin)
402         {
403             if (beginPos > dPos)
404                 beginPos -= dPos;
405             else
406                 beginPos = 0;
407             dPos *= 2;
408         }
409         // // Binary search forwards.
410         // typedef typename Iterator<TMatches>::Type TIterator;
411         // typedef typename Value<TMatches>::Type TMatch;
412         // TMatch m;
413         // m.beginPos = windowBegin;
414         // LessBeginPos<TMatch> cmp;
415         // TIterator it = std::lower_bound(begin(tls.matches, Standard()) + beginPos, end(tls.matches, Standard()), m, cmp);
416         // beginPos = it - begin(tls.matches, Standard());
417         if (firstBeginPos == std::numeric_limits<size_t>::max())
418             firstBeginPos = beginPos;
419 
420 // SEQAN_OMP_PRAGMA(critical)
421 //         if (length(tls.matches) > 0u)
422 //             std::cerr << "((MASKING FROM " << tls.matches[beginPos].beginPos << " TO " << windowBegin + tls.options.windowSize << " ~ " << back(tls.matches).beginPos << "))" << std::endl
423 //                       << "  ->> " << length(tls.matches) - beginPos << " of " << length(tls.matches) << " elements , beginPos == " << beginPos << std::endl;
424 // SEQAN_OMP_PRAGMA(critical)
425 //         if (length(tls.matches) > 0u)
426 //             std::cerr << "((MASKING FROM " << tls.matches[beginPos].beginPos << " TO " << windowBegin + tls.options.windowSize << "))" << std::endl;
427 // SEQAN_OMP_PRAGMA(critical)
428 //         std::cerr << "thread " << omp_get_thread_num() << " (masking from " << beginPos << " to end=" << length(tls.matches) << ")" << std::endl;
429         // Do not mask duplicates if not in edit distance mode and not using pigeonhole filter
430         if (!IsSameType<typename TRazerSMode::TGapMode, RazerSGapped>::VALUE && tls.options.threshold != 0)
431             continue;
432 
433         // (2 c) Mask duplicates from beginPos to the end position.
434         maskDuplicates(tls.matches, begin(tls.matches, Standard()) + beginPos, end(tls.matches, Standard()), tls.options, TRazerSMode());
435     }
436 
437     // std::cerr << "[wrote back " << bucketsWrittenBack << " buckets (" << tls.nextBucketToWriteBack << "/" << length(tls.missingInBucket) << ")]" << std::endl;
438     if (bucketsWrittenBack > 0u)
439     {
440         // (3) Update match filter data structure for disabling reads.
441         size_t nextWindowBegin = tls.options.windowSize * (tls.nextBucketToWriteBack);
442         if (tls.nextBucketToWriteBack == length(tls.missingInBucket)) // is last?
443             nextWindowBegin += DELTA;
444         // std::cerr << "((REGISTERING/PROCESSING FROM " << firstWindowBegin << " TO " << nextWindowBegin << "))" << std::endl;
445         typedef typename Iterator<TMatches>::Type TIterator;
446         TIterator itBegin = begin(tls.matches, Standard()) + firstBeginPos;
447         TIterator itEnd = end(tls.matches, Standard());
448         TIterator it = itBegin;
449         // SEQAN_ASSERT_LT(itBegin->beginPos, nextWindowBegin);
450         for (; it != itEnd && static_cast<size_t>(it->beginPos + DELTA) <= nextWindowBegin; ++it)
451         {
452             if (it->orientation == '-')
453                 continue;                          // Skip masked reads.
454             if (it->isRegistered)
455                 continue;
456             it->isRegistered = true;
457             registerRead(*tls.matchFilter, it->readId, it->score);
458         }
459         itEnd = it;
460         it = itBegin;
461         unsigned disabled = 0;
462         for (; it != itEnd; ++it)
463         {
464             if (it->orientation == '-')
465                 continue;                          // Skip masked reads.
466             disabled += processRead(*tls.matchFilter, it->readId);
467         }
468         if (tls.options._debugLevel >= 2 && disabled > 0)
469             fprintf(stderr, " [%u reads disabled]", disabled);
470     }
471 #else  // #ifdef RAZERS_DEFER_COMPACTION
472     for (unsigned i = 0; i < length(verificationHits); ++i)
473         newSize += length(*verificationHits[i].matches);
474 
475     reserve(tls.matches, newSize);
476 
477     // Write back all matches from verification to the block local store.
478     for (unsigned i = 0; i < length(verificationHits); ++i)
479         append(tls.matches, *verificationHits[i].matches);
480 
481     // Possibly compact matches.
482     if (!dontCompact && length(tls.matches) > tls.options.compactThresh)
483     {
484 #ifdef RAZERS_PROFILE
485         timelineBeginTask(TASK_COMPACT);
486 #endif  // #ifdef RAZERS_PROFILE
487         typedef typename TFragmentStore::TAlignedReadStore TAlignedReadStore;
488         typename Size<TAlignedReadStore>::Type oldSize = length(tls.matches);
489 
490         // if (tls.threadId == 0u && tls.options._debugLevel >= 3)
491         //     fprintf(stderr, "[compact]");
492         if (IsSameType<typename TRazerSMode::TGapMode, RazerSGapped>::VALUE || tls.options.threshold == 0)
493             maskDuplicates(tls.matches, tls.options, TRazerSMode());  // overlapping parallelograms cause duplicates
494 
495         compactMatches(tls.matches, tls.counts, tls.options, TRazerSMode(), tls, COMPACT);
496 
497         if (length(tls.matches) * 4 > oldSize)       // the threshold should not be raised if too many matches were removed
498         {
499             while (tls.options.compactThresh < oldSize)
500                 tls.options.compactThresh *= tls.options.compactMult;
501             // tls.options.compactThresh += (tls.options.compactThresh >> 1);  // if too many matches were removed
502             if (tls.threadId == 0u && tls.options._debugLevel >= 3)
503                 fprintf(stderr, "[raising threshold to %u]", unsigned(tls.options.compactThresh));
504         }
505 #ifdef RAZERS_PROFILE
506         timelineEndTask(TASK_COMPACT);
507 #endif  // #ifdef RAZERS_PROFILE
508     }
509 #endif  // #ifdef RAZERS_DEFER_COMPACTION
510 
511 #ifdef RAZERS_PROFILE
512     timelineEndTask(TASK_WRITEBACK);
513 #endif  // #ifdef RAZERS_PROFILE
514 }
515 
516 template <typename TMatches>
clearLocalMatches(String<TMatches * > & localMatches)517 void clearLocalMatches(String<TMatches *> & localMatches)
518 {
519     for (unsigned i = 0; i < length(localMatches); ++i)
520         delete localMatches[i];
521     clear(localMatches);
522 }
523 
524 template <typename TMatches>
clearLocalMatches(String<SingleVerificationResult<TMatches>> & localMatches)525 void clearLocalMatches(String<SingleVerificationResult<TMatches> > & localMatches)
526 {
527     clear(localMatches);
528 }
529 
530 // Find read matches in one genome sequence.
531 //
532 // The parallelization is simple.  We perform the filtering window-wise.
533 // Then, we generate verification jobs from the swift hits.  The SWIFT hits
534 // are distributed by a simple static load balancing of adjacent hits.  These
535 // are then processed by all "leading" threads, where leading threads are
536 // those with the largest number of processed windows.
537 template <
538     typename TFSSpec,
539     typename TFSConfig,
540     typename TThreadLocalStorages,
541     typename TContigId,
542     typename TCounts,
543     typename TSpec,
544     typename TShape,
545     typename TAlignMode,
546     typename TGapMode,
547     typename TScoreMode,
548     typename TMatchNPolicy,
549     typename TFilterSpec>
_mapSingleReadsParallelToContig(FragmentStore<TFSSpec,TFSConfig> & store,TThreadLocalStorages & threadLocalStorages,String<unsigned> const & splitters,TContigId const & contigId,TCounts &,char orientation,RazerSCoreOptions<TSpec> & options,TShape const &,RazerSMode<TAlignMode,TGapMode,TScoreMode,TMatchNPolicy> const &,TFilterSpec)550 void _mapSingleReadsParallelToContig(
551     FragmentStore<TFSSpec, TFSConfig> & store,
552     TThreadLocalStorages & threadLocalStorages,
553     String<unsigned> const & splitters,
554     TContigId const & contigId,
555     TCounts & /*cnts*/,
556     char                                                      orientation,
557     RazerSCoreOptions<TSpec> & options,
558     TShape const & /*shape*/,
559     RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> const & /*mode*/,
560     TFilterSpec)
561 {
562 #ifdef RAZERS_PROFILE
563     timelineBeginTask(TASK_ON_CONTIG);
564 #endif  // #ifdef RAZERS_PROFILE
565     typedef FragmentStore<TFSSpec, TFSConfig>                       TFragmentStore;
566     typedef typename TFragmentStore::TContigSeq                     TContigSeq;
567     typedef typename TFragmentStore::TReadSeqStore                  TReadSeqStore;
568     typedef typename Value<TReadSeqStore>::Type const               TRead;
569     typedef StringSet<TRead>                                        TReadSet;
570     typedef Index<TReadSet, IndexQGram<TShape, OpenAddressing> >    TIndex;         // q-gram index
571     //typedef typename Size<TReadSeqStore>::Type                      TSize;
572 
573     //typedef RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> TRazerSMode;
574     typedef RazerSCoreOptions<TSpec> TOptions;
575 
576     typedef typename Value<TThreadLocalStorages>::Type TThreadLocalStorage;
577     typedef typename TThreadLocalStorage::TMatches TMatches;
578 
579     // TODO(holtgrew): What about cnts, mode?
580 
581     typedef Finder<TContigSeq, TFilterSpec>                         TFilterFinder;
582     typedef Pattern<TIndex, TFilterSpec>                            TFilterPattern;
583 
584     typedef RazerSCoreOptions<TSpec> TOptions;
585 
586     typedef typename WindowFindResult<TFilterFinder>::Type THitString;
587     typedef Job<SingleVerification<TMatches, TFragmentStore, THitString, TOptions, TFilterPattern> > TVerificationJob;
588 
589     // Debug output...
590     if (options._debugLevel >= 1)
591     {
592         std::cerr << std::endl << "Process genome seq #" << contigId;
593         if (orientation == 'F')
594             std::cerr << "[fwd]";
595         else
596             std::cerr << "[rev]";
597     }
598 
599     // -----------------------------------------------------------------------
600     // Reverse-complement contig if necessary.
601     // -----------------------------------------------------------------------
602     TContigSeq & contigSeq = store.contigStore[contigId].seq;
603 #ifdef RAZERS_PROFILE
604     timelineBeginTask(TASK_REVCOMP);
605 #endif  // #ifdef RAZERS_PROFILE
606     if (orientation == 'R')
607         reverseComplement(contigSeq);
608 #ifdef RAZERS_PROFILE
609     timelineEndTask(TASK_REVCOMP);
610 #endif  // #ifdef RAZERS_PROFILE
611 
612     // -----------------------------------------------------------------------
613     // Per-contig initialization of thread local storage objects.
614     // -----------------------------------------------------------------------
615     // TODO(holtgrew): Maybe put into its own function?
616     for (unsigned i = 0; i < options.threadCount; ++i)
617     {
618         // Initialize verifier object.
619         threadLocalStorages[i].verifier.onReverseComplement = (orientation == 'R');
620         threadLocalStorages[i].verifier.genomeLength = length(contigSeq);
621         threadLocalStorages[i].verifier.oneMatchPerBucket = false;
622         threadLocalStorages[i].verifier.m.contigId = contigId;
623     }
624 
625     // -----------------------------------------------------------------------
626     // Perform filtration.
627     // -----------------------------------------------------------------------
628     TaskQueue<TVerificationJob, OmpLock> taskQueue;
629     volatile unsigned leaderWindowsDone = 0;  // Number of windows done in leaders.
630     volatile unsigned threadsFiltering = options.threadCount;
631 
632     // We will create the swift finder for thread 0 first.  This will trigger parallel repeat finding in the SWIFT
633     // finder construction.  Then, we copy over the finder to all threads.
634 #ifdef RAZERS_PROFILE
635     timelineBeginTask(TASK_COPY_FINDER);
636 #endif  // #ifdef RAZERS_PROFILE
637 
638     threadLocalStorages[0].filterFinder = TFilterFinder(store.contigStore[contigId].seq, threadLocalStorages[0].options.repeatLength, 1);
639 
640 #ifdef RAZERS_PROFILE
641     timelineEndTask(TASK_COPY_FINDER);
642 #endif  // #ifdef RAZERS_PROFILE
643 
644     SEQAN_OMP_PRAGMA(parallel)
645     {
646         unsigned windowsDone = 0;
647 
648         // Initialization.
649         TThreadLocalStorage & tls = threadLocalStorages[omp_get_thread_num()];
650 #ifdef RAZERS_PROFILE
651         timelineBeginTask(TASK_COPY_FINDER);
652 #endif  // #ifdef RAZERS_PROFILE
653         // tls.filterFinder = TFilterFinder(store.contigStore[contigId].seq, tls.options.repeatLength, 1);
654         if (omp_get_thread_num() != 0)
655             tls.filterFinder = threadLocalStorages[0].filterFinder;
656 
657         // wait until everyone copied the filter of thread 0 before it is changed
658         SEQAN_OMP_PRAGMA(barrier)
659 
660 #ifdef RAZERS_PROFILE
661         timelineEndTask(TASK_COPY_FINDER);
662         timelineBeginTask(TASK_FILTER);
663 #endif  // #ifdef RAZERS_PROFILE
664 
665         if (!windowFindBegin(tls.filterFinder, tls.filterPattern, tls.options.errorRate))
666             std::cerr << "ERROR: windowFindBegin() failed in thread " << tls.threadId << std::endl;
667 
668 #ifdef RAZERS_PROFILE
669         timelineEndTask(TASK_FILTER);
670 #endif  // #ifdef RAZERS_PROFILE
671 
672         // Pre-allocate buckets.
673         tls.nextBucketToWriteBack = 0;
674         resize(tls.verificationResultBuckets, unsigned(ceil(1.0 * (length(contigSeq) + 1000) / options.windowSize)));  // +1000 for the case where contig seq length is multiple of window size
675 // SEQAN_OMP_PRAGMA(critical)
676 //         std::cerr << "window count: " << length(tls.verificationResultBuckets) << std::endl;
677         clear(tls.missingInBucket);
678         resize(tls.missingInBucket, length(tls.verificationResultBuckets), std::numeric_limits<unsigned>::max());
679 
680         // For each filtration window...
681         bool hasMore = !empty(host(tls.filterFinder));
682         while (hasMore)
683         {
684 #ifdef RAZERS_PROFILE
685             timelineBeginTask(TASK_FILTER);
686 #endif  // #ifdef RAZERS_PROFILE
687             double filterStart = sysTime();
688             // fprintf(stderr, "[filter]");
689             hasMore = windowFindNext(tls.filterFinder, tls.filterPattern, tls.options.windowSize);
690             // std::cerr << "FILTERING WINDOW " << windowsDone << std::endl << "\t" << tls.options.windowSize;
691 
692             windowsDone += 1;  // Local windows done count.
693             atomicMax(leaderWindowsDone, windowsDone);
694 
695             std::shared_ptr<THitString> hitsPtr(new THitString()); //TODO (weese:) Could we reuse memory here?
696             resize(*hitsPtr, 1);
697             clear(*hitsPtr);
698             using std::swap;
699             swap(*hitsPtr, getWindowFindHits(tls.filterFinder));
700             THitString & hits = *hitsPtr;
701             tls.options.countFiltration += length(hits);
702             // std::cerr << "  HITS: " << length(hits) << std::endl;
703             // if (length(hits) > 0u)
704             //     std::cerr << "  RANGE " << front(hits).hstkPos << "\t" << back(hits).bucketWidth << std::endl;
705 
706             // Enqueue verification jobs.
707             if (length(hits) > 0u)
708             {
709                 // Compute splitters, given a verification package size and a
710                 // bound on the package count.
711                 String<unsigned> splitters;
712                 unsigned packageCount = tls.options.maxVerificationPackageCount * omp_get_max_threads();
713                 computeSplittersBySlotSize(splitters, length(hits), tls.options.verificationPackageSize, packageCount);
714 
715                 // Push verification jobs to the job queue.
716                 String<TVerificationJob> jobs;
717                 reserve(jobs, length(splitters) - 1);
718                 for (unsigned i = 1; i < length(splitters); ++i)
719                 {
720 // SEQAN_OMP_PRAGMA(critical)
721 //                     std::cerr << "\n";
722                     appendValue(jobs, TVerificationJob(tls.threadId, tls.verificationResults, store, contigId, windowsDone - 1, hitsPtr, i - 1, splitters[i - 1], splitters[i], *tls.globalOptions, tls.filterPattern));
723 // SEQAN_OMP_PRAGMA(critical)
724 //                     std::cerr << "new job(" << tls.threadId << ", tls.verificationResults, store, " << contigId << ", " << windowsDone - 1 << ", hitsPtr, " << i - 1 << ", " << splitters[i - 1] << ", " << splitters[i] << ", *tls.globalOptions, tls.filterPattern)" << std::endl;
725                 }
726                 pushFront(taskQueue, jobs);
727 
728                 // Preallocate space in bucket and initialize "to do" counter.
729                 clear(tls.verificationResultBuckets[windowsDone - 1]);
730                 resize(tls.verificationResultBuckets[windowsDone - 1], length(splitters) - 1);
731                 tls.missingInBucket[windowsDone - 1] = length(splitters) - 1;
732                 // for (unsigned i = 0; i < length(splitters) - 1; ++i)
733                 //     tls.missingInBucket[windowsDone - 1] -= splitters[i] == splitters[i + 1];
734             }
735             else
736             {
737                 tls.missingInBucket[windowsDone - 1] = 0;
738             }
739             tls.options.timeFiltration += sysTime() - filterStart;
740 #ifdef RAZERS_PROFILE
741             timelineEndTask(TASK_FILTER);
742 #endif  // #ifdef RAZERS_PROFILE
743 
744             // Perform verification as long as we are a leader and there are filtration jobs to perform.
745             while (leaderWindowsDone == windowsDone)
746             {
747                 TVerificationJob job;
748                 if (!popFront(job, taskQueue))
749                     break;
750                 // fprintf(stderr, "[verify]");
751                 workVerification(tls, job, splitters);
752             }
753 
754             // Write back verification results for this thread so far.
755             //
756             // First, swap out the current set of local stores from the verification results.
757             omp_set_lock(&tls.verificationResults.lock->lock_);
758             String<SingleVerificationResult<TMatches> > localMatches;
759 // SEQAN_OMP_PRAGMA(critical)
760 //             std::cerr << "thread " << omp_get_thread_num() << " SWAPPING " << &tls.verificationResults.localMatches << "\n";
761             swap(localMatches, tls.verificationResults.localMatches);
762             omp_unset_lock(&tls.verificationResults.lock->lock_);
763             // Don't compact matches if in configured 'block fraction' of genome.
764             size_t hstckLen = tls.filterFinder.endPos - tls.filterFinder.startPos;
765             size_t hstckLeft = tls.filterFinder.endPos - tls.filterFinder.curPos;
766             double fracTodo = 1.0  * hstckLeft / hstckLen;
767             bool dontCompact = tls.options.noCompactFrac >= fracTodo;
768             // Write back the contents of these stores to the thread-local store.
769             writeBackToLocal(tls, localMatches, dontCompact);
770 // #ifndef RAZERS_DEFER_COMPACTION
771             clearLocalMatches(localMatches);
772 // #endif  // #ifndef RAZERS_DEFER_COMPACTION
773         }
774 
775         // Finalization
776         windowFindEnd(tls.filterFinder, tls.filterPattern);
777 
778         SEQAN_OMP_PRAGMA(atomic)
779         threadsFiltering -= 1;
780 
781         // Continue to try to help verify.
782         while (threadsFiltering > 0u)
783         {
784             TVerificationJob job;
785             if (popFront(job, taskQueue))
786                 workVerification(tls, job, splitters);
787         }
788 
789         // After every thread is done with everything, write back once more.
790         SEQAN_OMP_PRAGMA(barrier)
791         writeBackToLocal(tls, tls.verificationResults.localMatches, true);
792 // #ifndef RAZERS_DEFER_COMPACTION
793         clearLocalMatches(tls.verificationResults.localMatches);
794 // #endif  // #ifndef RAZERS_DEFER_COMPACTION
795         // std::cerr << "AT END OF CONTIG #alignments " << length(tls.matches) << std::endl;
796     }
797 
798 
799     // NOTE:
800     // We never undo the reverse-complementing!
801     // It is not necessary as the contigs are freed by unlockAndFreeContig
802     // They are loaded again in dumpMatches if necessary
803     //
804     // if (!unlockAndFreeContig(store, contigId))						// if the contig is still used
805     //     if (orientation == 'R')	reverseComplement(contigSeq);	// we have to restore original orientation
806 #ifdef RAZERS_PROFILE
807     timelineEndTask(TASK_ON_CONTIG);
808 #endif  // #ifdef RAZERS_PROFILE
809 }
810 
811 // Global initialization of block local storages.
812 template <typename TThreadLocalStorages, typename TFragmentStore, typename TSplitters, typename TShape, typename TOptions>
initializeThreadLocalStoragesSingle(TThreadLocalStorages & threadLocalStorages,TFragmentStore & store,TSplitters const & splitters,TShape & shape,TOptions & options)813 void initializeThreadLocalStoragesSingle(TThreadLocalStorages & threadLocalStorages,
814                                          TFragmentStore /*const*/ & store,
815                                          TSplitters const & splitters,
816                                          TShape /*const*/ & shape,
817                                          TOptions /*const*/ & options)
818 {
819     SEQAN_ASSERT_GT(length(splitters), 1u);
820     int threadCount = length(splitters) - 1;
821 
822     typedef typename Value<TThreadLocalStorages>::Type TThreadLocalStorage;
823     typedef typename TThreadLocalStorage::TFilterPattern TFilterPattern;
824     typedef typename Host<TFilterPattern>::Type TIndex;
825     typedef typename Position<typename TFragmentStore::TContigStore>::Type TPosition;
826 
827     resize(threadLocalStorages, threadCount);
828     SEQAN_OMP_PRAGMA(parallel for schedule(static, 1))
829     for (int i = 0; i < threadCount; ++i)
830     {
831         TThreadLocalStorage & tls = threadLocalStorages[i];
832 
833         tls.threadId = i;
834         tls.globalStore = &store;
835         tls.shape = shape;
836         tls.options = options;  // TODO(holtgrew): Copy for stats and threshold, really good?
837         tls.globalOptions = &options;
838         tls.splitters = splitters;
839 
840 #ifdef RAZERS_DEFER_COMPACTION
841         typedef typename TThreadLocalStorage::TMatchFilter TMatchFilter;
842         double READ_FRAC_WITH_HISTO = 0.01;
843         tls.matchFilter.reset(new TMatchFilter(tls.splitters[tls.threadId + 1] - tls.splitters[tls.threadId], options.matchHistoStartThreshold, READ_FRAC_WITH_HISTO, tls, tls.splitters[tls.threadId], tls.globalStore->readSeqStore, tls.options));
844         tls.options.compactThresh = std::numeric_limits<unsigned>::max();
845 #endif // #ifdef RAZERS_DEFER_COMPACTION
846 
847         // Clear pattern and set parameters.
848         TFilterPattern & filterPattern = tls.filterPattern;
849         clear(filterPattern);
850 
851         // Initialize the index.
852         TIndex & index = host(tls.filterPattern);
853         clear(index);
854 //        indexText(index).limitsValid = false;
855 //        assign(indexText(index).strings, infix(store.readSeqStore, splitters[i], splitters[i + 1]), Exact());
856 
857         clear(indexText(index));
858         for (TPosition j = splitters[i]; j < splitters[i + 1]; ++j)
859             appendValue(indexText(index), store.readSeqStore[j]);
860         //unsigned x = length(indexText(index));
861         //fprintf(stderr, "Index #%d has %u entries.\n", i, x);
862         index.shape = shape;
863 
864 #ifdef RAZERS_OPENADDRESSING
865         index.alpha = options.loadFactor;
866 #endif
867         cargo(index).abundanceCut = options.abundanceCut;
868         cargo(index)._debugLevel = options._debugLevel;
869 
870         // Configure filter pattern
871         // (if this is a pigeonhole filter, all sequences must be appended first)
872         _applyFilterOptions(filterPattern, options);
873 
874         tls.filterPattern.params.printDots = (tls.threadId == 0) && (tls.options._debugLevel > 0);
875     }
876 }
877 
878 // Write back from thread local storages to global store.
879 template <typename TFragmentStore,
880           typename TThreadLocalStorages>
881 void
writeBackToGlobalStore(TFragmentStore & target,TThreadLocalStorages & threadLocalStorages,bool isSingleEnd)882 writeBackToGlobalStore(
883     TFragmentStore & target,
884     TThreadLocalStorages /*const*/ & threadLocalStorages,
885     bool isSingleEnd)      // begin/end already swapped for paired-end reads
886 {
887     typedef typename Size<typename TFragmentStore::TAlignedReadStore>::Type TAlignedReadStoreSize;
888     typedef typename Value<typename TFragmentStore::TAlignedReadStore>::Type TAlignedReadStoreElem;
889     typedef typename Value<typename TFragmentStore::TAlignQualityStore>::Type TAlignedQualStoreElem;
890     typedef typename Value<TThreadLocalStorages>::Type TThreadLocalStorage;
891     typedef typename TThreadLocalStorage::TMatches TMatches;
892     typedef typename Iterator<TMatches, Standard>::Type TMatchesIterator;
893 
894     // Update the IDs and calculate new size so the prefix increment can be
895     // used in the loops.
896     TAlignedReadStoreSize oldSize = length(target.alignedReadStore);
897     TAlignedReadStoreSize newSize = oldSize;
898 
899     for (unsigned i = 0; i < length(threadLocalStorages); ++i)
900         newSize += length(threadLocalStorages[i].matches);
901 
902     // Resize first so copying happens at most once and not every for each
903     // block in the worst case
904     resize(target.alignedReadStore, newSize);
905     resize(target.alignQualityStore, newSize);
906 
907     // Append single block stores.
908     // TODO(holtgrew): Do in parallel!
909     for (unsigned i = 0; i < length(threadLocalStorages); ++i)
910     {
911         TMatchesIterator it = begin(threadLocalStorages[i].matches, Standard());
912         TMatchesIterator itEnd = end(threadLocalStorages[i].matches, Standard());
913         for (; it != itEnd; ++it, ++oldSize)
914         {
915             using std::swap;
916             if (isSingleEnd && it->orientation == 'R')
917                 swap(it->beginPos, it->endPos);
918             target.alignedReadStore[oldSize] = TAlignedReadStoreElem(oldSize, it->readId, it->contigId, it->beginPos, it->endPos);
919             if (!isSingleEnd)
920                 SEQAN_ASSERT_NEQ(it->pairMatchId, Value<TMatches>::Type::INVALID_ID);
921             target.alignedReadStore[oldSize].pairMatchId = it->pairMatchId;
922             target.alignQualityStore[oldSize] = TAlignedQualStoreElem(it->pairScore, it->score, -it->score);
923         }
924     }
925 }
926 
927 template <typename TFragmentStore,
928           typename TThreadLocalStorages>
929 void
writeBackToGlobalStore(TFragmentStore & target,TThreadLocalStorages & threadLocalStorages)930 writeBackToGlobalStore(
931     TFragmentStore & target,
932     TThreadLocalStorages /*const*/ & threadLocalStorages)
933 {
934     writeBackToGlobalStore(target, threadLocalStorages, true);
935 }
936 
937 // Performs splitting of reads, initialization of OpenMP and the calls
938 // mapSingleReadsParallelToContig for each contig.
939 template <
940     typename TFSSpec,
941     typename TFSConfig,
942     typename TCounts,
943     typename TSpec,
944     typename TShape,
945     typename TAlignMode,
946     typename TGapMode,
947     typename TScoreMode,
948     typename TMatchNPolicy,
949     typename TFilterSpec>
_mapSingleReadsParallel(FragmentStore<TFSSpec,TFSConfig> & store,TCounts & cnts,RazerSCoreOptions<TSpec> & options,TShape const & shape,RazerSMode<TAlignMode,TGapMode,TScoreMode,TMatchNPolicy> const & mode,TFilterSpec)950 int _mapSingleReadsParallel(
951     FragmentStore<TFSSpec, TFSConfig> & store,
952     TCounts & cnts,
953     RazerSCoreOptions<TSpec> & options,
954     TShape const & shape,
955     RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> const & mode,
956     TFilterSpec)
957 {
958     typedef FragmentStore<TFSSpec, TFSConfig>                       TFragmentStore;
959     typedef typename TFragmentStore::TReadSeqStore                  TReadSeqStore;
960     typedef typename Value<TReadSeqStore>::Type const               TRead;
961     typedef StringSet<TRead>                                        TReadSet;
962     typedef Index<TReadSet, IndexQGram<TShape, OpenAddressing> >    TIndex;         // q-gram index
963     //typedef typename Size<TReadSeqStore>::Type                      TSize;
964 
965     typedef Pattern<TIndex, TFilterSpec>                            TFilterPattern;
966     //typedef Pattern<TRead, MyersUkkonen>                            TMyersPattern;  // verifier
967     typedef RazerSCoreOptions<TSpec> TOptions;
968 
969     typedef RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> TRazerSMode;
970     typedef typename TFragmentStore::TContigSeq                     TContigSeq;
971     typedef Finder<TContigSeq, TFilterSpec>                         TFilterFinder;
972 
973     typedef typename TFragmentStore::TContigSeq TContigSeq;
974     typedef typename Position<TContigSeq>::Type TContigPos;
975     typedef MatchRecord<TContigPos> TMatchRecord;
976     typedef String<TMatchRecord> TMatches;
977 
978     // -----------------------------------------------------------------------
979     // Initialize global information.
980     // -----------------------------------------------------------------------
981 
982     // Save OpenMP maximal thread count so we can restore it below, then set
983     // from options.
984     if (options._debugLevel >= 1)
985         std::cerr << std::endl << "Number of threads:               \t" << options.threadCount << std::endl;
986     int oldMaxThreads = omp_get_max_threads();
987     omp_set_num_threads(options.threadCount);
988 
989 #ifdef RAZERS_PROFILE
990     timelineBeginTask(TASK_INIT);
991     double beginInit = sysTime();
992 #endif  // #ifdef RAZERS_PROFILE
993 
994     // Clear/initialize global stats.
995     options.countFiltration = 0;
996     options.countVerification = 0;
997     options.timeMapReads = 0;
998     options.timeDumpResults = 0;
999 
1000     // -----------------------------------------------------------------------
1001     // Initialize thread local storages.
1002     // -----------------------------------------------------------------------
1003     SEQAN_PROTIMESTART(initTime);
1004     String<unsigned> splitters;
1005     computeSplittersBySlotCount(splitters, length(store.readNameStore), options.threadCount);
1006     typedef ThreadLocalStorage<MapSingleReads<TMatches, TFragmentStore, TFilterFinder, TFilterPattern, TShape, TOptions, TCounts, TRazerSMode> > TThreadLocalStorage;
1007     String<TThreadLocalStorage> threadLocalStorages;
1008     initializeThreadLocalStoragesSingle(threadLocalStorages, store, splitters, shape, options);
1009 
1010 #ifdef RAZERS_PROFILE
1011     double endInit = sysTime();
1012     std::cerr << "TIME initialization: " << (endInit - beginInit) << " s";
1013     timelineEndTask(TASK_INIT);
1014 #endif  // #ifdef RAZERS_PROFILE
1015     double timeInitialization = SEQAN_PROTIMEDIFF(initTime);
1016     if (options._debugLevel >= 1)
1017         std::cerr << std::endl << "Initialization took              \t" << timeInitialization << " seconds" << std::endl;
1018 
1019     // -----------------------------------------------------------------------
1020     // Perform parallel mapping.
1021     // -----------------------------------------------------------------------
1022 
1023     // Save compaction threshold and set global threshold to infinity, so matchVerify does not compact!
1024     int oldThreshold = options.compactThresh;
1025     options.compactThresh = std::numeric_limits<unsigned>::max();
1026 
1027     // For each contig: Map reads in parallel.
1028     SEQAN_PROTIMESTART(findTime);
1029     for (unsigned contigId = 0; contigId < length(store.contigStore); ++contigId)
1030     {
1031         lockContig(store, contigId);
1032         if (options.forward)
1033             _mapSingleReadsParallelToContig(store, threadLocalStorages, splitters, contigId, cnts, 'F', options, shape, mode, TFilterSpec());
1034         if (options.reverse)
1035             _mapSingleReadsParallelToContig(store, threadLocalStorages, splitters, contigId, cnts, 'R', options, shape, mode, TFilterSpec());
1036         unlockAndFreeContig(store, contigId);
1037     }
1038     double endMapping = sysTime();
1039 #ifdef RAZERS_PROFILE
1040     std::cerr << std::endl << "TIME mapping: " << (endMapping - endInit) << " s" << std::endl;
1041 #endif  // #ifdef RAZERS_PROFILE
1042 
1043 #ifdef RAZERS_EXTERNAL_MATCHES
1044     // Compute whether to use slow, sequential sorting or parallel in-memory sorting.
1045     uint64_t totalMatchCount = 0;
1046     uint64_t maxMatchCount = 0;
1047     for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1048     {
1049         totalMatchCount += length(threadLocalStorages[i].matches);
1050         maxMatchCount = _max(maxMatchCount, length(threadLocalStorages[i].matches));
1051     }
1052     bool useExternalSort = false;
1053     bool useSequentialCompaction = false;
1054     if (options.availableMatchesMemorySize == -1)
1055     {
1056         useExternalSort = true;
1057     }
1058     else if (options.availableMatchesMemorySize != 0)
1059     {
1060         typedef typename Value<TMatches>::Type TMatch;
1061         int64_t totalMemoryRequired = sizeof(TMatch) * totalMatchCount;
1062         int64_t maxMemoryRequired = sizeof(TMatch) * maxMatchCount;
1063         if (options.availableMatchesMemorySize < totalMemoryRequired)
1064         {
1065             if (options.availableMatchesMemorySize < maxMemoryRequired)
1066             {
1067                 useExternalSort = true;
1068             }
1069             else
1070             {
1071                 useSequentialCompaction = true;
1072             }
1073         }
1074     }
1075 
1076     // std::cerr << "useExternalSort == " << useExternalSort << "\n"
1077     //           << "useSequentialCompaction == " << useSequentialCompaction << "\n";
1078 
1079     // Switch between using parallel compaction, sequential compaction, and
1080     // sequential compaction with external sorting.  The actual switch for the
1081     // sorting is in function compactMatches().
1082     if (useSequentialCompaction || useExternalSort)
1083     {
1084         for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1085         {
1086             // remove duplicates when mapping with gaps or using pigeonhole filter
1087             if (IsSameType<TGapMode, RazerSGapped>::VALUE || options.threshold == 0)
1088                 maskDuplicates(threadLocalStorages[omp_get_thread_num()].matches, options, mode);
1089             Nothing nothing;
1090             CompactMatchesMode compactMode = useSequentialCompaction ? COMPACT_FINAL : COMPACT_FINAL_EXTERNAL;
1091             // std::cerr << "BEFORE FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1092             compactMatches(threadLocalStorages[omp_get_thread_num()].matches, cnts, options, mode, nothing, compactMode);
1093             // std::cerr << "AFTER FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1094         }
1095     }
1096     else
1097     {
1098 #endif  // #ifdef RAZERS_EXTERNAL_MATCHES
1099     SEQAN_OMP_PRAGMA(parallel)
1100     {
1101 // TODO(holtgrew): We would really like to stop the additional masking step, the incremental masking SHOULD have taken care of this. Thus, the following should be ifndef and not ifdef.
1102 #ifndef RAZERS_DEFER_COMPACTION
1103         // remove duplicates when mapping with gaps or using pigeonhole filter
1104         if (IsSameType<TGapMode, RazerSGapped>::VALUE || options.threshold == 0)
1105             maskDuplicates(threadLocalStorages[omp_get_thread_num()].matches, options, mode);
1106 #endif  // #ifndef RAZERS_DEFER_COMPACTION
1107         Nothing nothing;
1108         // std::cerr << "BEFORE FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1109 // SEQAN_OMP_PRAGMA(critical)
1110 //             std::cerr << "BEFORE FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1111         compactMatches(threadLocalStorages[omp_get_thread_num()].matches, cnts, options, mode, nothing, COMPACT_FINAL);
1112         // std::cerr << "AFTER FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1113 // SEQAN_OMP_PRAGMA(critical)
1114 //             std::cerr << "AFTER FINAL COMPACTION " << length(threadLocalStorages[omp_get_thread_num()].matches) << std::endl;
1115     }
1116     SEQAN_OMP_PRAGMA(barrier)
1117 #ifdef RAZERS_EXTERNAL_MATCHES
1118 }
1119 
1120 #endif // #ifdef RAZERS_EXTERNAL_MATCHES
1121 
1122     // Write back local stores to global stores.
1123     writeBackToGlobalStore(store, threadLocalStorages);
1124     // std::cerr << "length(threadLocalStorages[0].matches) == " << length(threadLocalStorages[0].matches) << std::endl;
1125     // std::cerr << "length(store.alignedReadStore) == " << length(store.alignedReadStore) << std::endl;
1126     double endWriteback = sysTime();
1127     options.timeFsCopy = endWriteback - endMapping;
1128 
1129     // TODO(holtgrew): Sum up cnts?!
1130 
1131     // -----------------------------------------------------------------------
1132     // Collect global statistics, cleanup.
1133     // -----------------------------------------------------------------------
1134 
1135     // Restore old compaction threshold.
1136     options.compactThresh = oldThreshold;
1137 
1138     // Add up thread-local filtration and verification counters and print totals.
1139     for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1140     {
1141         options.countFiltration += threadLocalStorages[i].options.countFiltration;
1142         options.countVerification += threadLocalStorages[i].options.countVerification;
1143     }
1144 
1145     if (options._debugLevel >= 1)
1146     {
1147         for (unsigned i = 0; i < length(threadLocalStorages); ++i)
1148         {
1149             std::cerr << "Thread #" << i << std::endl;
1150             std::cerr << "  Masking duplicates took        \t" << threadLocalStorages[i].options.timeMaskDuplicates << " seconds" << std::endl;
1151             std::cerr << "  Compacting matches took        \t" << threadLocalStorages[i].options.timeCompactMatches << " seconds" << std::endl;
1152             std::cerr << "  Time for filtration            \t" << threadLocalStorages[i].options.timeFiltration << " seconds" << std::endl;
1153             std::cerr << "  Time for verifications         \t" << threadLocalStorages[i].options.timeVerification << " seconds" << std::endl;
1154         }
1155         std::cerr << "Time for copying back            \t" << options.timeFsCopy << " seconds" << std::endl;
1156     }
1157 
1158     options.timeMapReads = SEQAN_PROTIMEDIFF(findTime);
1159     if (options._debugLevel >= 1)
1160         std::cerr << std::endl << "Finding reads took               \t" << options.timeMapReads << " seconds" << std::endl;
1161     if (options._debugLevel >= 1)
1162     {
1163         std::cerr << std::endl;
1164         std::cerr << "___FILTRATION_STATS____" << std::endl;
1165         std::cerr << "Filtration counter:      " << options.countFiltration << std::endl;
1166         std::cerr << "Successful verifications: " << options.countVerification << std::endl;
1167     }
1168 
1169     // Restore global state.
1170     omp_set_num_threads(oldMaxThreads);
1171 
1172     return 0;
1173 }
1174 
1175 // Performs splitting of reads, initialization of OpenMP and the calls
1176 // mapSingleReadsParallelToContig for each contig.
1177 template <
1178     typename TFSSpec,
1179     typename TFSConfig,
1180     typename TCounts,
1181     typename TSpec,
1182     typename TShape,
1183     typename TAlignMode,
1184     typename TGapMode,
1185     typename TScoreMode,
1186     typename TMatchNPolicy>
_mapSingleReadsParallel(FragmentStore<TFSSpec,TFSConfig> & store,TCounts & cnts,RazerSCoreOptions<TSpec> & options,TShape const & shape,RazerSMode<TAlignMode,TGapMode,TScoreMode,TMatchNPolicy> const & mode)1187 int _mapSingleReadsParallel(
1188     FragmentStore<TFSSpec, TFSConfig> & store,
1189     TCounts & cnts,
1190     RazerSCoreOptions<TSpec> & options,
1191     TShape const & shape,
1192     RazerSMode<TAlignMode, TGapMode, TScoreMode, TMatchNPolicy> const & mode)
1193 {
1194     if (options.threshold > 0)
1195     {
1196         typedef typename If<IsSameType<TGapMode, RazerSGapped>, SwiftSemiGlobal, SwiftSemiGlobalHamming>::Type TSwiftSpec;
1197         return _mapSingleReadsParallel(store, cnts, options, shape, mode, Swift<TSwiftSpec>());
1198     }
1199     else
1200     {
1201         typedef typename If<IsSameType<TGapMode, RazerSGapped>, void, Hamming_>::Type TPigeonholeSpec;
1202         return _mapSingleReadsParallel(store, cnts, options, Shape<Dna, OneGappedShape>(), mode, Pigeonhole<TPigeonholeSpec>());
1203     }
1204 }
1205 
1206 }  // namespace seqan
1207 
1208 #endif  // #ifndef RAZERS_RAZERS_PARALLEL_H_
1209