1 #include "peprocessor.h"
2 #include "fastqreader.h"
3 #include <iostream>
4 #include <unistd.h>
5 #include <functional>
6 #include <thread>
7 #include <memory.h>
8 #include "util.h"
9 #include "adaptertrimmer.h"
10 #include "basecorrector.h"
11 #include "jsonreporter.h"
12 #include "htmlreporter.h"
13 #include "polyx.h"
14 
PairEndProcessor(Options * opt)15 PairEndProcessor::PairEndProcessor(Options* opt){
16     mOptions = opt;
17     mLeftReaderFinished = false;
18     mRightReaderFinished = false;
19     mFinishedThreads = 0;
20     mFilter = new Filter(opt);
21     mUmiProcessor = new UmiProcessor(opt);
22 
23     int isizeBufLen = mOptions->insertSizeMax + 1;
24     mInsertSizeHist = new atomic_long[isizeBufLen];
25     memset(mInsertSizeHist, 0, sizeof(atomic_long)*isizeBufLen);
26     mLeftWriter =  NULL;
27     mRightWriter = NULL;
28     mUnpairedLeftWriter =  NULL;
29     mUnpairedRightWriter = NULL;
30     mMergedWriter = NULL;
31     mFailedWriter = NULL;
32     mOverlappedWriter = NULL;
33 
34     mDuplicate = NULL;
35     if(mOptions->duplicate.enabled) {
36         mDuplicate = new Duplicate(mOptions);
37     }
38 
39     mLeftPackReadCounter = 0;
40     mRightPackReadCounter = 0;
41     mPackProcessedCounter = 0;
42 
43     mLeftReadPool = new ReadPool(mOptions);
44     mRightReadPool = new ReadPool(mOptions);
45 }
46 
~PairEndProcessor()47 PairEndProcessor::~PairEndProcessor() {
48     delete mInsertSizeHist;
49     if(mDuplicate) {
50         delete mDuplicate;
51         mDuplicate = NULL;
52     }
53     if(mLeftReadPool) {
54         delete mLeftReadPool;
55         mLeftReadPool = NULL;
56     }
57     if(mRightReadPool) {
58         delete mRightReadPool;
59         mRightReadPool = NULL;
60     }
61 }
62 
initOutput()63 void PairEndProcessor::initOutput() {
64     if(!mOptions->unpaired1.empty())
65         mUnpairedLeftWriter = new WriterThread(mOptions, mOptions->unpaired1);
66 
67     if(!mOptions->unpaired2.empty() && mOptions->unpaired2 != mOptions->unpaired1)
68         mUnpairedRightWriter = new WriterThread(mOptions, mOptions->unpaired2);
69 
70     if(mOptions->merge.enabled) {
71         if(!mOptions->merge.out.empty())
72             mMergedWriter = new WriterThread(mOptions, mOptions->merge.out);
73     }
74 
75     if(!mOptions->failedOut.empty())
76         mFailedWriter = new WriterThread(mOptions, mOptions->failedOut);
77 
78     if(!mOptions->overlappedOut.empty())
79         mOverlappedWriter = new WriterThread(mOptions, mOptions->overlappedOut);
80 
81     if(mOptions->out1.empty())
82         return;
83 
84     mLeftWriter = new WriterThread(mOptions, mOptions->out1);
85     if(!mOptions->out2.empty())
86         mRightWriter = new WriterThread(mOptions, mOptions->out2);
87 }
88 
closeOutput()89 void PairEndProcessor::closeOutput() {
90     if(mLeftWriter) {
91         delete mLeftWriter;
92         mLeftWriter = NULL;
93     }
94     if(mRightWriter) {
95         delete mRightWriter;
96         mRightWriter = NULL;
97     }
98     if(mMergedWriter) {
99         delete mMergedWriter;
100         mMergedWriter = NULL;
101     }
102     if(mFailedWriter) {
103         delete mFailedWriter;
104         mFailedWriter = NULL;
105     }
106     if(mOverlappedWriter) {
107         delete mOverlappedWriter;
108         mOverlappedWriter = NULL;
109     }
110     if(mUnpairedLeftWriter) {
111         delete mUnpairedLeftWriter;
112         mUnpairedLeftWriter = NULL;
113     }
114     if(mUnpairedRightWriter) {
115         delete mUnpairedRightWriter;
116         mUnpairedRightWriter = NULL;
117     }
118 }
119 
initConfig(ThreadConfig * config)120 void PairEndProcessor::initConfig(ThreadConfig* config) {
121     if(mOptions->out1.empty())
122         return;
123     if(mOptions->split.enabled) {
124         config->initWriterForSplit();
125     }
126 }
127 
128 
process()129 bool PairEndProcessor::process(){
130     if(!mOptions->split.enabled)
131         initOutput();
132 
133     std::thread* readerLeft = NULL;
134     std::thread* readerRight = NULL;
135     std::thread* readerInterveleaved = NULL;
136 
137     mLeftInputLists = new SingleProducerSingleConsumerList<ReadPack*>*[mOptions->thread];
138     mRightInputLists = new SingleProducerSingleConsumerList<ReadPack*>*[mOptions->thread];
139 
140     ThreadConfig** configs = new ThreadConfig*[mOptions->thread];
141     for(int t=0; t<mOptions->thread; t++){
142         mLeftInputLists[t] = new SingleProducerSingleConsumerList<ReadPack*>();
143         mRightInputLists[t] = new SingleProducerSingleConsumerList<ReadPack*>();
144         configs[t] = new ThreadConfig(mOptions, t, true);
145         configs[t]->setInputListPair(mLeftInputLists[t], mRightInputLists[t]);
146         initConfig(configs[t]);
147     }
148 
149     if(mOptions->interleavedInput)
150         readerInterveleaved= new std::thread(std::bind(&PairEndProcessor::interleavedReaderTask, this));
151     else {
152         readerLeft = new std::thread(std::bind(&PairEndProcessor::readerTask, this, true));
153         readerRight = new std::thread(std::bind(&PairEndProcessor::readerTask, this, false));
154     }
155 
156     std::thread** threads = new thread*[mOptions->thread];
157     for(int t=0; t<mOptions->thread; t++){
158         threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTask, this, configs[t]));
159     }
160 
161     std::thread* leftWriterThread = NULL;
162     std::thread* rightWriterThread = NULL;
163     std::thread* unpairedLeftWriterThread = NULL;
164     std::thread* unpairedRightWriterThread = NULL;
165     std::thread* mergedWriterThread = NULL;
166     std::thread* failedWriterThread = NULL;
167     std::thread* overlappedWriterThread = NULL;
168     if(mLeftWriter)
169         leftWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mLeftWriter));
170     if(mRightWriter)
171         rightWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mRightWriter));
172     if(mUnpairedLeftWriter)
173         unpairedLeftWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mUnpairedLeftWriter));
174     if(mUnpairedRightWriter)
175         unpairedRightWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mUnpairedRightWriter));
176     if(mMergedWriter)
177         mergedWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mMergedWriter));
178     if(mFailedWriter)
179         failedWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mFailedWriter));
180     if(mOverlappedWriter)
181         overlappedWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mOverlappedWriter));
182 
183     if(readerInterveleaved) {
184         readerInterveleaved->join();
185     } else {
186         readerLeft->join();
187         readerRight->join();
188     }
189     for(int t=0; t<mOptions->thread; t++){
190         threads[t]->join();
191     }
192 
193     if(!mOptions->split.enabled) {
194         if(leftWriterThread)
195             leftWriterThread->join();
196         if(rightWriterThread)
197             rightWriterThread->join();
198         if(unpairedLeftWriterThread)
199             unpairedLeftWriterThread->join();
200         if(unpairedRightWriterThread)
201             unpairedRightWriterThread->join();
202         if(mergedWriterThread)
203             mergedWriterThread->join();
204         if(failedWriterThread)
205             failedWriterThread->join();
206         if(overlappedWriterThread)
207             overlappedWriterThread->join();
208     }
209 
210     if(mOptions->verbose)
211         loginfo("start to generate reports\n");
212 
213     // merge stats and filter results
214     vector<Stats*> preStats1;
215     vector<Stats*> postStats1;
216     vector<Stats*> preStats2;
217     vector<Stats*> postStats2;
218     vector<FilterResult*> filterResults;
219     for(int t=0; t<mOptions->thread; t++){
220         preStats1.push_back(configs[t]->getPreStats1());
221         postStats1.push_back(configs[t]->getPostStats1());
222         preStats2.push_back(configs[t]->getPreStats2());
223         postStats2.push_back(configs[t]->getPostStats2());
224         filterResults.push_back(configs[t]->getFilterResult());
225     }
226     Stats* finalPreStats1 = Stats::merge(preStats1);
227     Stats* finalPostStats1 = Stats::merge(postStats1);
228     Stats* finalPreStats2 = Stats::merge(preStats2);
229     Stats* finalPostStats2 = Stats::merge(postStats2);
230     FilterResult* finalFilterResult = FilterResult::merge(filterResults);
231 
232     cerr << "Read1 before filtering:"<<endl;
233     finalPreStats1->print();
234     cerr << endl;
235     cerr << "Read2 before filtering:"<<endl;
236     finalPreStats2->print();
237     cerr << endl;
238     if(!mOptions->merge.enabled) {
239         cerr << "Read1 after filtering:"<<endl;
240         finalPostStats1->print();
241         cerr << endl;
242         cerr << "Read2 after filtering:"<<endl;
243         finalPostStats2->print();
244     } else {
245         cerr << "Merged and filtered:"<<endl;
246         finalPostStats1->print();
247     }
248 
249     cerr << endl;
250     cerr << "Filtering result:"<<endl;
251     finalFilterResult->print();
252 
253     double dupRate = 0.0;
254     if(mOptions->duplicate.enabled) {
255         dupRate = mDuplicate->getDupRate();
256         cerr << endl;
257         cerr << "Duplication rate: " << dupRate * 100.0 << "%" << endl;
258     }
259 
260     // insert size distribution
261     int peakInsertSize = getPeakInsertSize();
262     cerr << endl;
263     cerr << "Insert size peak (evaluated by paired-end reads): " << peakInsertSize << endl;
264 
265     if(mOptions->merge.enabled) {
266         cerr << endl;
267         cerr << "Read pairs merged: " << finalFilterResult->mMergedPairs << endl;
268         if(finalPostStats1->getReads() > 0) {
269             double postMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPostStats1->getReads();
270             double preMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPreStats1->getReads();
271             cerr << "% of original read pairs: " << preMergedPercent << "%" << endl;
272             cerr << "% in reads after filtering: " << postMergedPercent << "%" << endl;
273         }
274         cerr << endl;
275     }
276 
277     // make JSON report
278     JsonReporter jr(mOptions);
279     jr.setDup(dupRate);
280     jr.setInsertHist(mInsertSizeHist, peakInsertSize);
281     jr.report(finalFilterResult, finalPreStats1, finalPostStats1, finalPreStats2, finalPostStats2);
282 
283     // make HTML report
284     HtmlReporter hr(mOptions);
285     hr.setDup(dupRate);
286     hr.setInsertHist(mInsertSizeHist, peakInsertSize);
287     hr.report(finalFilterResult, finalPreStats1, finalPostStats1, finalPreStats2, finalPostStats2);
288 
289     // clean up
290     for(int t=0; t<mOptions->thread; t++){
291         delete threads[t];
292         threads[t] = NULL;
293         delete configs[t];
294         configs[t] = NULL;
295     }
296 
297     if(readerInterveleaved) {
298         delete readerInterveleaved;
299     } else {
300         delete readerLeft;
301         delete readerRight;
302     }
303 
304     delete finalPreStats1;
305     delete finalPostStats1;
306     delete finalPreStats2;
307     delete finalPostStats2;
308     delete finalFilterResult;
309 
310     delete[] threads;
311     delete[] configs;
312 
313     if(leftWriterThread)
314         delete leftWriterThread;
315     if(rightWriterThread)
316         delete rightWriterThread;
317     if(unpairedLeftWriterThread)
318         delete unpairedLeftWriterThread;
319     if(unpairedRightWriterThread)
320         delete unpairedRightWriterThread;
321     if(mergedWriterThread)
322         delete mergedWriterThread;
323     if(failedWriterThread)
324         delete failedWriterThread;
325     if(overlappedWriterThread)
326         delete overlappedWriterThread;
327 
328     if(!mOptions->split.enabled)
329         closeOutput();
330 
331     return true;
332 }
333 
getPeakInsertSize()334 int PairEndProcessor::getPeakInsertSize() {
335     int peak = 0;
336     long maxCount = -1;
337     for(int i=0; i<mOptions->insertSizeMax; i++) {
338         if(mInsertSizeHist[i] > maxCount) {
339             peak = i;
340             maxCount = mInsertSizeHist[i];
341         }
342     }
343     return peak;
344 }
345 
recycleToPool1(int tid,Read * r)346 void PairEndProcessor::recycleToPool1(int tid, Read* r) {
347     // failed to recycle, then delete it
348     if(!mLeftReadPool->input(tid, r))
349         delete r;
350 }
351 
recycleToPool2(int tid,Read * r)352 void PairEndProcessor::recycleToPool2(int tid, Read* r) {
353     // failed to recycle, then delete it
354     if(!mRightReadPool->input(tid, r))
355         delete r;
356 }
357 
processPairEnd(ReadPack * leftPack,ReadPack * rightPack,ThreadConfig * config)358 bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config){
359     if(leftPack->count != rightPack->count) {
360         cerr << endl;
361         cerr << "WARNNIG: different read numbers of the " << mPackProcessedCounter << " pack" << endl;
362         cerr << "Read1 pack size: " << leftPack->count << endl;
363         cerr << "Read2 pack size: " << rightPack->count << endl;
364         cerr << endl;
365     }
366     int tid = config->getThreadId();
367 
368     string* outstr1 = new string();
369     string* outstr2 = new string();
370     string* unpairedOut1 = new string();
371     string* unpairedOut2 = new string();
372     string* singleOutput = new string();
373     string* mergedOutput = new string();
374     string* failedOut = new string();
375     string* overlappedOut = new string();
376 
377     int readPassed = 0;
378     int mergedCount = 0;
379     for(int p=0;p<leftPack->count && p<rightPack->count;p++){
380         Read* or1 = leftPack->data[p];
381         Read* or2 = rightPack->data[p];
382 
383         int lowQualNum1 = 0;
384         int nBaseNum1 = 0;
385         int lowQualNum2 = 0;
386         int nBaseNum2 = 0;
387 
388         // stats the original read before trimming
389         config->getPreStats1()->statRead(or1);
390         config->getPreStats2()->statRead(or2);
391 
392         // handling the duplication profiling
393         bool dedupOut = false;
394         if(mDuplicate) {
395             bool isDup = mDuplicate->checkPair(or1, or2);
396             if(mOptions->duplicate.dedup && isDup)
397                 dedupOut = true;
398         }
399 
400         // filter by index
401         if(mOptions->indexFilter.enabled && mFilter->filterByIndex(or1, or2)) {
402             recycleToPool1(tid, or1);
403             or1 = NULL;
404             recycleToPool2(tid, or2);
405             or2 = NULL;
406             continue;
407         }
408 
409         // fix MGI
410         if(mOptions->fixMGI) {
411             or1->fixMGI();
412             or2->fixMGI();
413         }
414         // umi processing
415         if(mOptions->umi.enabled)
416             mUmiProcessor->process(or1, or2);
417 
418         // trim in head and tail, and apply quality cut in sliding window
419         int frontTrimmed1 = 0;
420         int frontTrimmed2 = 0;
421         Read* r1 = mFilter->trimAndCut(or1, mOptions->trim.front1, mOptions->trim.tail1, frontTrimmed1);
422         Read* r2 = mFilter->trimAndCut(or2, mOptions->trim.front2, mOptions->trim.tail2, frontTrimmed2);
423 
424         if(r1 != NULL && r2!=NULL) {
425             if(mOptions->polyGTrim.enabled)
426                 PolyX::trimPolyG(r1, r2, config->getFilterResult(), mOptions->polyGTrim.minLen);
427         }
428         bool isizeEvaluated = false;
429         if(r1 != NULL && r2!=NULL && (mOptions->adapter.enabled || mOptions->correction.enabled)){
430             OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, mOptions->overlapDiffPercentLimit/100.0);
431             // we only use thread 0 to evaluae ISIZE
432             if(config->getThreadId() == 0) {
433                 statInsertSize(r1, r2, ov, frontTrimmed1, frontTrimmed2);
434                 isizeEvaluated = true;
435             }
436             if(mOptions->correction.enabled) {
437                 BaseCorrector::correctByOverlapAnalysis(r1, r2, config->getFilterResult(), ov);
438             }
439             if(mOptions->adapter.enabled) {
440                 bool trimmed = AdapterTrimmer::trimByOverlapAnalysis(r1, r2, config->getFilterResult(), ov, frontTrimmed1, frontTrimmed2);
441                 bool trimmed1 = trimmed;
442                 bool trimmed2 = trimmed;
443                 if(!trimmed){
444                     if(mOptions->adapter.hasSeqR1)
445                         trimmed1 = AdapterTrimmer::trimBySequence(r1, config->getFilterResult(), mOptions->adapter.sequence, false);
446                     if(mOptions->adapter.hasSeqR2)
447                         trimmed2 = AdapterTrimmer::trimBySequence(r2, config->getFilterResult(), mOptions->adapter.sequenceR2, true);
448                 }
449                 if(mOptions->adapter.hasFasta) {
450                     AdapterTrimmer::trimByMultiSequences(r1, config->getFilterResult(), mOptions->adapter.seqsInFasta, false, !trimmed1);
451                     AdapterTrimmer::trimByMultiSequences(r2, config->getFilterResult(), mOptions->adapter.seqsInFasta, true, !trimmed2);
452                 }
453             }
454         }
455 
456         if(r1 != NULL && r2!=NULL && mOverlappedWriter) {
457             OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, 0);
458             if(ov.overlapped) {
459                 Read* overlappedRead = new Read(r1->mName, new string(r1->mSeq->substr(max(0,ov.offset)), ov.overlap_len), r1->mStrand, new string(r1->mQuality->substr(max(0,ov.offset)), ov.overlap_len));
460                 overlappedRead->appendToString(overlappedOut);
461                 recycleToPool1(tid, overlappedRead);
462             }
463         }
464 
465         if(config->getThreadId() == 0 && !isizeEvaluated && r1 != NULL && r2!=NULL) {
466             OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, mOptions->overlapDiffPercentLimit/100.0);
467             statInsertSize(r1, r2, ov, frontTrimmed1, frontTrimmed2);
468             isizeEvaluated = true;
469         }
470 
471         if(r1 != NULL && r2!=NULL) {
472             if(mOptions->polyXTrim.enabled)
473                 PolyX::trimPolyX(r1, r2, config->getFilterResult(), mOptions->polyXTrim.minLen);
474         }
475 
476         if(r1 != NULL && r2!=NULL) {
477             if( mOptions->trim.maxLen1 > 0 && mOptions->trim.maxLen1 < r1->length())
478                 r1->resize(mOptions->trim.maxLen1);
479             if( mOptions->trim.maxLen2 > 0 && mOptions->trim.maxLen2 < r2->length())
480                 r2->resize(mOptions->trim.maxLen2);
481         }
482 
483         Read* merged = NULL;
484         // merging mode
485         bool mergeProcessed = false;
486         if(mOptions->merge.enabled && r1 && r2) {
487             OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, mOptions->overlapDiffPercentLimit/100.0);
488             if(ov.overlapped) {
489                 merged = OverlapAnalysis::merge(r1, r2, ov);
490                 int result = mFilter->passFilter(merged);
491                 config->addFilterResult(result, 2);
492                 if(result == PASS_FILTER) {
493                     merged->appendToString(mergedOutput);
494                     config->getPostStats1()->statRead(merged);
495                     readPassed++;
496                     mergedCount++;
497                 }
498                 recycleToPool1(tid, merged);
499                 mergeProcessed = true;
500             } else if(mOptions->merge.includeUnmerged){
501                 int result1 = mFilter->passFilter(r1);
502                 config->addFilterResult(result1, 1);
503                 if(result1 == PASS_FILTER && !dedupOut) {
504                     r1->appendToString(mergedOutput);
505                     config->getPostStats1()->statRead(r1);
506                 }
507 
508                 int result2 = mFilter->passFilter(r2);
509                 config->addFilterResult(result2, 1);
510                 if(result2 == PASS_FILTER && !dedupOut) {
511                     r2->appendToString(mergedOutput);
512                     config->getPostStats1()->statRead(r2);
513                 }
514                 if(result1 == PASS_FILTER && result2 == PASS_FILTER )
515                     readPassed++;
516                 mergeProcessed = true;
517             }
518         }
519 
520         if(!mergeProcessed) {
521 
522             int result1 = mFilter->passFilter(r1);
523             int result2 = mFilter->passFilter(r2);
524 
525             config->addFilterResult(max(result1, result2), 2);
526 
527             if(!dedupOut) {
528 
529                 if( r1 != NULL &&  result1 == PASS_FILTER && r2 != NULL && result2 == PASS_FILTER ) {
530 
531                     if(mOptions->outputToSTDOUT && !mOptions->merge.enabled) {
532                         r1->appendToString(singleOutput);
533                         r2->appendToString(singleOutput);
534                     } else {
535                         r1->appendToString(outstr1);
536                         r2->appendToString(outstr2);
537                     }
538 
539                     // stats the read after filtering
540                     if(!mOptions->merge.enabled) {
541                         config->getPostStats1()->statRead(r1);
542                         config->getPostStats2()->statRead(r2);
543                     }
544 
545                     readPassed++;
546                 } else if( r1 != NULL &&  result1 == PASS_FILTER) {
547                     if(mUnpairedLeftWriter) {
548                         r1->appendToString(unpairedOut1);
549                         if(mFailedWriter)
550                             or2->appendToStringWithTag(failedOut, FAILED_TYPES[result2]);
551                     } else {
552                         if(mFailedWriter) {
553                             or1->appendToStringWithTag(failedOut, "paired_read_is_failing");
554                             or2->appendToStringWithTag(failedOut, FAILED_TYPES[result2]);
555                         }
556                     }
557                 } else if( r2 != NULL && result2 == PASS_FILTER) {
558                     if(mUnpairedRightWriter) {
559                         r2->appendToString(unpairedOut2);
560                         if(mFailedWriter)
561                             or1->appendToStringWithTag(failedOut,FAILED_TYPES[result1]);
562                     } else if(mUnpairedLeftWriter) {
563                         r2->appendToString(unpairedOut1);
564                         if(mFailedWriter)
565                             or1->appendToStringWithTag(failedOut,FAILED_TYPES[result1]);
566                     }  else {
567                         if(mFailedWriter) {
568                             or1->appendToStringWithTag(failedOut, FAILED_TYPES[result1]);
569                             or2->appendToStringWithTag(failedOut, "paired_read_is_failing");
570                         }
571                     }
572                 }
573             }
574         }
575 
576         // if no trimming applied, r1 should be identical to or1
577         if(r1 != or1 && r1 != NULL) {
578             recycleToPool1(tid, r1);
579             r1 = NULL;
580         }
581         // if no trimming applied, r1 should be identical to or1
582         if(r2 != or2 && r2 != NULL) {
583             recycleToPool2(tid, r2);
584             r2 = NULL;
585         }
586 
587         if(or1) {
588             recycleToPool1(tid, or1);
589             or1 = NULL;
590         }
591         if(or2) {
592             recycleToPool2(tid, or2);
593             or2 = NULL;
594         }
595     }
596 
597     if(mOptions->outputToSTDOUT) {
598         // STDOUT output
599         // if it's merging mode, write the merged reads to STDOUT
600         // otherwise write interleaved single output
601         if(mOptions->merge.enabled)
602             fwrite(mergedOutput->c_str(), 1, mergedOutput->length(), stdout);
603         else
604             fwrite(singleOutput->c_str(), 1, singleOutput->length(), stdout);
605     } else if(mOptions->split.enabled) {
606         // split output by each worker thread
607         if(!mOptions->out1.empty())
608             config->getWriter1()->writeString(outstr1);
609         if(!mOptions->out2.empty())
610             config->getWriter2()->writeString(outstr2);
611     }
612 
613     if(mMergedWriter) {
614         // write merged data
615         mMergedWriter->input(tid, mergedOutput);
616         mergedOutput = NULL;
617     }
618 
619     if(mFailedWriter) {
620         // write failed data
621         mFailedWriter->input(tid, failedOut);
622         failedOut = NULL;
623     }
624 
625     if(mOverlappedWriter) {
626         // write failed data
627         mOverlappedWriter->input(tid, overlappedOut);
628         overlappedOut = NULL;
629     }
630 
631     // normal output by left/right writer thread
632     if(mRightWriter && mLeftWriter) {
633         // write PE
634         mLeftWriter->input(tid, outstr1);
635         outstr1 = NULL;
636 
637         mRightWriter->input(tid, outstr2);
638         outstr2 = NULL;
639     } else if(mLeftWriter) {
640         // write singleOutput
641         mLeftWriter->input(tid, singleOutput);
642         singleOutput = NULL;
643     }
644     // output unpaired reads
645     if(mUnpairedLeftWriter && mUnpairedRightWriter) {
646         // write PE
647         mUnpairedLeftWriter->input(tid, unpairedOut1);
648         unpairedOut1 = NULL;
649 
650         mUnpairedRightWriter->input(tid, unpairedOut2);
651         unpairedOut2 = NULL;
652     } else if(mUnpairedLeftWriter) {
653         mUnpairedLeftWriter->input(tid, unpairedOut1);
654         unpairedOut1 = NULL;
655     }
656 
657     if(mOptions->split.byFileLines)
658         config->markProcessed(readPassed);
659     else
660         config->markProcessed(leftPack->count);
661 
662     if(mOptions->merge.enabled) {
663         config->addMergedPairs(mergedCount);
664     }
665 
666     if(outstr1)
667         delete outstr1;
668     if(outstr2)
669         delete outstr2;
670     if(unpairedOut1)
671         delete unpairedOut1;
672     if(unpairedOut2)
673         delete unpairedOut2;
674     if(singleOutput)
675         delete singleOutput;
676     if(mergedOutput)
677         delete mergedOutput;
678     if(failedOut)
679         delete failedOut;
680     if(overlappedOut)
681         delete overlappedOut;
682 
683     delete leftPack->data;
684     delete rightPack->data;
685     delete leftPack;
686     delete rightPack;
687 
688     mPackProcessedCounter++;
689 
690     return true;
691 }
692 
statInsertSize(Read * r1,Read * r2,OverlapResult & ov,int frontTrimmed1,int frontTrimmed2)693 void PairEndProcessor::statInsertSize(Read* r1, Read* r2, OverlapResult& ov, int frontTrimmed1, int frontTrimmed2) {
694     int isize = mOptions->insertSizeMax;
695     if(ov.overlapped) {
696         if(ov.offset > 0)
697             isize = r1->length() + r2->length() - ov.overlap_len + frontTrimmed1 + frontTrimmed2;
698         else
699             isize = ov.overlap_len + frontTrimmed1 + frontTrimmed2;
700     }
701 
702     if(isize > mOptions->insertSizeMax)
703         isize = mOptions->insertSizeMax;
704 
705     mInsertSizeHist[isize]++;
706 }
707 
readerTask(bool isLeft)708 void PairEndProcessor::readerTask(bool isLeft)
709 {
710     if(mOptions->verbose) {
711         if(isLeft)
712             loginfo("start to load data of read1");
713         else
714             loginfo("start to load data of read2");
715     }
716     long lastReported = 0;
717     int slept = 0;
718     long readNum = 0;
719     bool splitSizeReEvaluated = false;
720     Read** data = new Read*[PACK_SIZE];
721     memset(data, 0, sizeof(Read*)*PACK_SIZE);
722     FastqReader* reader = NULL;
723     if(isLeft) {
724         reader = new FastqReader(mOptions->in1, true, mOptions->phred64);
725         reader->setReadPool(mLeftReadPool);
726     }
727     else {
728         reader = new FastqReader(mOptions->in2, true, mOptions->phred64);
729         reader->setReadPool(mRightReadPool);
730     }
731 
732     int count=0;
733     bool needToBreak = false;
734     while(true){
735         Read* read = reader->read();
736         if(!read || needToBreak){
737             // the last pack
738             ReadPack* pack = new ReadPack;
739             pack->data = data;
740             pack->count = count;
741 
742             if(isLeft) {
743                 mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(pack);
744                 mLeftPackReadCounter++;
745             } else {
746                 mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack);
747                 mRightPackReadCounter++;
748             }
749             data = NULL;
750             if(read) {
751                 delete read;
752                 read = NULL;
753             }
754             break;
755         }
756         data[count] = read;
757         count++;
758         // configured to process only first N reads
759         if(mOptions->readsToProcess >0 && count + readNum >= mOptions->readsToProcess) {
760             needToBreak = true;
761         }
762         if(mOptions->verbose && count + readNum >= lastReported + 1000000) {
763             lastReported = count + readNum;
764             string msg;
765             if(isLeft)
766                 msg = "Read1: ";
767             else
768                 msg = "Read2: ";
769             msg += "loaded " + to_string((lastReported/1000000)) + "M reads";
770             loginfo(msg);
771         }
772         // a full pack
773         if(count == PACK_SIZE || needToBreak){
774             ReadPack* pack = new ReadPack;
775             pack->data = data;
776             pack->count = count;
777 
778             if(isLeft) {
779                 mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(pack);
780                 mLeftPackReadCounter++;
781             } else {
782                 mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack);
783                 mRightPackReadCounter++;
784             }
785 
786             //re-initialize data for next pack
787             data = new Read*[PACK_SIZE];
788             memset(data, 0, sizeof(Read*)*PACK_SIZE);
789             // if the processor is far behind this reader, sleep and wait to limit memory usage
790             if(isLeft) {
791                 while(mLeftPackReadCounter - mPackProcessedCounter > PACK_IN_MEM_LIMIT){
792                     //cerr<<"sleep"<<endl;
793                     slept++;
794                     usleep(100);
795                 }
796             } else {
797                 while(mRightPackReadCounter - mPackProcessedCounter > PACK_IN_MEM_LIMIT){
798                     //cerr<<"sleep"<<endl;
799                     slept++;
800                     usleep(100);
801                 }
802             }
803             readNum += count;
804             // if the writer threads are far behind this producer, sleep and wait
805             // check this only when necessary
806             if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) {
807                 while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){
808                     slept++;
809                     usleep(1000);
810                 }
811             }
812             // reset count to 0
813             count = 0;
814             // re-evaluate split size
815             // TODO: following codes are commented since it may cause threading related conflicts in some systems
816             /*if(mOptions->split.needEvaluation && !splitSizeReEvaluated && readNum >= mOptions->split.size) {
817                 splitSizeReEvaluated = true;
818                 // greater than the initial evaluation
819                 if(readNum >= 1024*1024) {
820                     size_t bytesRead;
821                     size_t bytesTotal;
822                     reader.getBytes(bytesRead, bytesTotal);
823                     mOptions->split.size *=  (double)bytesTotal / ((double)bytesRead * (double) mOptions->split.number);
824                     if(mOptions->split.size <= 0)
825                         mOptions->split.size = 1;
826                 }
827             }*/
828         }
829     }
830 
831     for(int t=0; t<mOptions->thread; t++) {
832         if(isLeft)
833             mLeftInputLists[t]->setProducerFinished();
834         else
835             mRightInputLists[t]->setProducerFinished();
836     }
837 
838     if(mOptions->verbose) {
839         if(isLeft) {
840             mLeftReaderFinished = true;
841             loginfo("Read1: loading completed with " + to_string(mLeftPackReadCounter) + " packs");
842         }
843         else {
844             mRightReaderFinished = true;
845             loginfo("Read2: loading completed with " + to_string(mRightPackReadCounter) + " packs");
846         }
847     }
848 
849 
850     // if the last data initialized is not used, free it
851     if(data != NULL)
852         delete[] data;
853     if(reader != NULL)
854         delete reader;
855 }
856 
interleavedReaderTask()857 void PairEndProcessor::interleavedReaderTask()
858 {
859     if(mOptions->verbose)
860         loginfo("start to load data");
861     long lastReported = 0;
862     int slept = 0;
863     long readNum = 0;
864     bool splitSizeReEvaluated = false;
865     Read** dataLeft = new Read*[PACK_SIZE];
866     Read** dataRight = new Read*[PACK_SIZE];
867     memset(dataLeft, 0, sizeof(Read*)*PACK_SIZE);
868     memset(dataRight, 0, sizeof(Read*)*PACK_SIZE);
869     FastqReaderPair reader(mOptions->in1, mOptions->in2, true, mOptions->phred64,true);
870     int count=0;
871     bool needToBreak = false;
872     while(true){
873         ReadPair* pair = reader.read();
874         // TODO: put needToBreak here is just a WAR for resolve some unidentified dead lock issue
875         if(!pair || needToBreak){
876             // the last pack
877             ReadPack* packLeft = new ReadPack;
878             ReadPack* packRight = new ReadPack;
879             packLeft->data = dataLeft;
880             packRight->data = dataRight;
881             packLeft->count = count;
882             packRight->count = count;
883 
884             mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(packLeft);
885             mLeftPackReadCounter++;
886 
887             mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight);
888             mRightPackReadCounter++;
889 
890             dataLeft = NULL;
891             dataRight = NULL;
892             if(pair) {
893                 delete pair;
894                 pair = NULL;
895             }
896             break;
897         }
898         dataLeft[count] = pair->mLeft;
899         dataRight[count] = pair->mRight;
900         count++;
901         // configured to process only first N reads
902         if(mOptions->readsToProcess >0 && count + readNum >= mOptions->readsToProcess) {
903             needToBreak = true;
904         }
905         if(mOptions->verbose && count + readNum >= lastReported + 1000000) {
906             lastReported = count + readNum;
907             string msg = "loaded " + to_string((lastReported/1000000)) + "M read pairs";
908             loginfo(msg);
909         }
910         // a full pack
911         if(count == PACK_SIZE || needToBreak){
912             ReadPack* packLeft = new ReadPack;
913             ReadPack* packRight = new ReadPack;
914             packLeft->data = dataLeft;
915             packRight->data = dataRight;
916             packLeft->count = count;
917             packRight->count = count;
918 
919             mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(packLeft);
920             mLeftPackReadCounter++;
921 
922             mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight);
923             mRightPackReadCounter++;
924 
925             //re-initialize data for next pack
926             dataLeft = new Read*[PACK_SIZE];
927             dataRight = new Read*[PACK_SIZE];
928             memset(dataLeft, 0, sizeof(Read*)*PACK_SIZE);
929             memset(dataRight, 0, sizeof(Read*)*PACK_SIZE);
930             // if the consumer is far behind this producer, sleep and wait to limit memory usage
931             while(mLeftPackReadCounter - mPackProcessedCounter > PACK_IN_MEM_LIMIT){
932                 //cerr<<"sleep"<<endl;
933                 slept++;
934                 usleep(100);
935             }
936             readNum += count;
937             // if the writer threads are far behind this producer, sleep and wait
938             // check this only when necessary
939             if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) {
940                 while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){
941                     slept++;
942                     usleep(1000);
943                 }
944             }
945             // reset count to 0
946             count = 0;
947             // re-evaluate split size
948             // TODO: following codes are commented since it may cause threading related conflicts in some systems
949             /*if(mOptions->split.needEvaluation && !splitSizeReEvaluated && readNum >= mOptions->split.size) {
950                 splitSizeReEvaluated = true;
951                 // greater than the initial evaluation
952                 if(readNum >= 1024*1024) {
953                     size_t bytesRead;
954                     size_t bytesTotal;
955                     reader.mLeft->getBytes(bytesRead, bytesTotal);
956                     mOptions->split.size *=  (double)bytesTotal / ((double)bytesRead * (double) mOptions->split.number);
957                     if(mOptions->split.size <= 0)
958                         mOptions->split.size = 1;
959                 }
960             }*/
961         }
962     }
963 
964     for(int t=0; t<mOptions->thread; t++) {
965         mLeftInputLists[t]->setProducerFinished();
966         mRightInputLists[t]->setProducerFinished();
967     }
968 
969     if(mOptions->verbose) {
970         loginfo("interleaved: loading completed with " + to_string(mLeftPackReadCounter) + " packs");
971     }
972 
973     mLeftReaderFinished = true;
974     mRightReaderFinished = true;
975 
976     // if the last data initialized is not used, free it
977     if(dataLeft != NULL)
978         delete[] dataLeft;
979     if(dataRight != NULL)
980         delete[] dataRight;
981 }
982 
processorTask(ThreadConfig * config)983 void PairEndProcessor::processorTask(ThreadConfig* config)
984 {
985     SingleProducerSingleConsumerList<ReadPack*>* inputLeft = config->getLeftInput();
986     SingleProducerSingleConsumerList<ReadPack*>* inputRight = config->getRightInput();
987     while(true) {
988         if(config->canBeStopped()){
989             break;
990         }
991         while(inputLeft->canBeConsumed() && inputRight->canBeConsumed()) {
992             ReadPack* dataLeft = inputLeft->consume();
993             ReadPack* dataRight = inputRight->consume();
994             processPairEnd(dataLeft, dataRight, config);
995         }
996         if(inputLeft->isProducerFinished() && !inputLeft->canBeConsumed()) {
997             break;
998         } else if(inputRight->isProducerFinished() && !inputRight->canBeConsumed()) {
999             break;
1000         } else {
1001             usleep(100);
1002         }
1003     }
1004     inputLeft->setConsumerFinished();
1005     inputRight->setConsumerFinished();
1006 
1007     mFinishedThreads++;
1008     if(mOptions->verbose) {
1009         string msg = "thread " + to_string(config->getThreadId() + 1) + " data processing completed";
1010         loginfo(msg);
1011     }
1012 
1013     if(mFinishedThreads == mOptions->thread) {
1014         if(mLeftWriter)
1015             mLeftWriter->setInputCompleted();
1016         if(mRightWriter)
1017             mRightWriter->setInputCompleted();
1018         if(mUnpairedLeftWriter)
1019             mUnpairedLeftWriter->setInputCompleted();
1020         if(mUnpairedRightWriter)
1021             mUnpairedRightWriter->setInputCompleted();
1022         if(mMergedWriter)
1023             mMergedWriter->setInputCompleted();
1024         if(mFailedWriter)
1025             mFailedWriter->setInputCompleted();
1026         if(mOverlappedWriter)
1027             mOverlappedWriter->setInputCompleted();
1028     }
1029 
1030     if(mOptions->verbose) {
1031         string msg = "thread " + to_string(config->getThreadId() + 1) + " finished";
1032         loginfo(msg);
1033     }
1034 }
1035 
writerTask(WriterThread * config)1036 void PairEndProcessor::writerTask(WriterThread* config)
1037 {
1038     while(true) {
1039         if(config->isCompleted()){
1040             // last check for possible threading related issue
1041             config->output();
1042             break;
1043         }
1044         config->output();
1045     }
1046 
1047     if(mOptions->verbose) {
1048         string msg = config->getFilename() + " writer finished";
1049         loginfo(msg);
1050     }
1051 }
1052