1 #include "peprocessor.h"
2 #include "fastqreader.h"
3 #include <iostream>
4 #include <unistd.h>
5 #include <functional>
6 #include <thread>
7 #include <memory.h>
8 #include "util.h"
9 #include "adaptertrimmer.h"
10 #include "basecorrector.h"
11 #include "jsonreporter.h"
12 #include "htmlreporter.h"
13 #include "polyx.h"
14
PairEndProcessor(Options * opt)15 PairEndProcessor::PairEndProcessor(Options* opt){
16 mOptions = opt;
17 mLeftReaderFinished = false;
18 mRightReaderFinished = false;
19 mFinishedThreads = 0;
20 mFilter = new Filter(opt);
21 mUmiProcessor = new UmiProcessor(opt);
22
23 int isizeBufLen = mOptions->insertSizeMax + 1;
24 mInsertSizeHist = new atomic_long[isizeBufLen];
25 memset(mInsertSizeHist, 0, sizeof(atomic_long)*isizeBufLen);
26 mLeftWriter = NULL;
27 mRightWriter = NULL;
28 mUnpairedLeftWriter = NULL;
29 mUnpairedRightWriter = NULL;
30 mMergedWriter = NULL;
31 mFailedWriter = NULL;
32 mOverlappedWriter = NULL;
33
34 mDuplicate = NULL;
35 if(mOptions->duplicate.enabled) {
36 mDuplicate = new Duplicate(mOptions);
37 }
38
39 mLeftPackReadCounter = 0;
40 mRightPackReadCounter = 0;
41 mPackProcessedCounter = 0;
42
43 mLeftReadPool = new ReadPool(mOptions);
44 mRightReadPool = new ReadPool(mOptions);
45 }
46
~PairEndProcessor()47 PairEndProcessor::~PairEndProcessor() {
48 delete mInsertSizeHist;
49 if(mDuplicate) {
50 delete mDuplicate;
51 mDuplicate = NULL;
52 }
53 if(mLeftReadPool) {
54 delete mLeftReadPool;
55 mLeftReadPool = NULL;
56 }
57 if(mRightReadPool) {
58 delete mRightReadPool;
59 mRightReadPool = NULL;
60 }
61 }
62
initOutput()63 void PairEndProcessor::initOutput() {
64 if(!mOptions->unpaired1.empty())
65 mUnpairedLeftWriter = new WriterThread(mOptions, mOptions->unpaired1);
66
67 if(!mOptions->unpaired2.empty() && mOptions->unpaired2 != mOptions->unpaired1)
68 mUnpairedRightWriter = new WriterThread(mOptions, mOptions->unpaired2);
69
70 if(mOptions->merge.enabled) {
71 if(!mOptions->merge.out.empty())
72 mMergedWriter = new WriterThread(mOptions, mOptions->merge.out);
73 }
74
75 if(!mOptions->failedOut.empty())
76 mFailedWriter = new WriterThread(mOptions, mOptions->failedOut);
77
78 if(!mOptions->overlappedOut.empty())
79 mOverlappedWriter = new WriterThread(mOptions, mOptions->overlappedOut);
80
81 if(mOptions->out1.empty())
82 return;
83
84 mLeftWriter = new WriterThread(mOptions, mOptions->out1);
85 if(!mOptions->out2.empty())
86 mRightWriter = new WriterThread(mOptions, mOptions->out2);
87 }
88
closeOutput()89 void PairEndProcessor::closeOutput() {
90 if(mLeftWriter) {
91 delete mLeftWriter;
92 mLeftWriter = NULL;
93 }
94 if(mRightWriter) {
95 delete mRightWriter;
96 mRightWriter = NULL;
97 }
98 if(mMergedWriter) {
99 delete mMergedWriter;
100 mMergedWriter = NULL;
101 }
102 if(mFailedWriter) {
103 delete mFailedWriter;
104 mFailedWriter = NULL;
105 }
106 if(mOverlappedWriter) {
107 delete mOverlappedWriter;
108 mOverlappedWriter = NULL;
109 }
110 if(mUnpairedLeftWriter) {
111 delete mUnpairedLeftWriter;
112 mUnpairedLeftWriter = NULL;
113 }
114 if(mUnpairedRightWriter) {
115 delete mUnpairedRightWriter;
116 mUnpairedRightWriter = NULL;
117 }
118 }
119
initConfig(ThreadConfig * config)120 void PairEndProcessor::initConfig(ThreadConfig* config) {
121 if(mOptions->out1.empty())
122 return;
123 if(mOptions->split.enabled) {
124 config->initWriterForSplit();
125 }
126 }
127
128
process()129 bool PairEndProcessor::process(){
130 if(!mOptions->split.enabled)
131 initOutput();
132
133 std::thread* readerLeft = NULL;
134 std::thread* readerRight = NULL;
135 std::thread* readerInterveleaved = NULL;
136
137 mLeftInputLists = new SingleProducerSingleConsumerList<ReadPack*>*[mOptions->thread];
138 mRightInputLists = new SingleProducerSingleConsumerList<ReadPack*>*[mOptions->thread];
139
140 ThreadConfig** configs = new ThreadConfig*[mOptions->thread];
141 for(int t=0; t<mOptions->thread; t++){
142 mLeftInputLists[t] = new SingleProducerSingleConsumerList<ReadPack*>();
143 mRightInputLists[t] = new SingleProducerSingleConsumerList<ReadPack*>();
144 configs[t] = new ThreadConfig(mOptions, t, true);
145 configs[t]->setInputListPair(mLeftInputLists[t], mRightInputLists[t]);
146 initConfig(configs[t]);
147 }
148
149 if(mOptions->interleavedInput)
150 readerInterveleaved= new std::thread(std::bind(&PairEndProcessor::interleavedReaderTask, this));
151 else {
152 readerLeft = new std::thread(std::bind(&PairEndProcessor::readerTask, this, true));
153 readerRight = new std::thread(std::bind(&PairEndProcessor::readerTask, this, false));
154 }
155
156 std::thread** threads = new thread*[mOptions->thread];
157 for(int t=0; t<mOptions->thread; t++){
158 threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTask, this, configs[t]));
159 }
160
161 std::thread* leftWriterThread = NULL;
162 std::thread* rightWriterThread = NULL;
163 std::thread* unpairedLeftWriterThread = NULL;
164 std::thread* unpairedRightWriterThread = NULL;
165 std::thread* mergedWriterThread = NULL;
166 std::thread* failedWriterThread = NULL;
167 std::thread* overlappedWriterThread = NULL;
168 if(mLeftWriter)
169 leftWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mLeftWriter));
170 if(mRightWriter)
171 rightWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mRightWriter));
172 if(mUnpairedLeftWriter)
173 unpairedLeftWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mUnpairedLeftWriter));
174 if(mUnpairedRightWriter)
175 unpairedRightWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mUnpairedRightWriter));
176 if(mMergedWriter)
177 mergedWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mMergedWriter));
178 if(mFailedWriter)
179 failedWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mFailedWriter));
180 if(mOverlappedWriter)
181 overlappedWriterThread = new std::thread(std::bind(&PairEndProcessor::writerTask, this, mOverlappedWriter));
182
183 if(readerInterveleaved) {
184 readerInterveleaved->join();
185 } else {
186 readerLeft->join();
187 readerRight->join();
188 }
189 for(int t=0; t<mOptions->thread; t++){
190 threads[t]->join();
191 }
192
193 if(!mOptions->split.enabled) {
194 if(leftWriterThread)
195 leftWriterThread->join();
196 if(rightWriterThread)
197 rightWriterThread->join();
198 if(unpairedLeftWriterThread)
199 unpairedLeftWriterThread->join();
200 if(unpairedRightWriterThread)
201 unpairedRightWriterThread->join();
202 if(mergedWriterThread)
203 mergedWriterThread->join();
204 if(failedWriterThread)
205 failedWriterThread->join();
206 if(overlappedWriterThread)
207 overlappedWriterThread->join();
208 }
209
210 if(mOptions->verbose)
211 loginfo("start to generate reports\n");
212
213 // merge stats and filter results
214 vector<Stats*> preStats1;
215 vector<Stats*> postStats1;
216 vector<Stats*> preStats2;
217 vector<Stats*> postStats2;
218 vector<FilterResult*> filterResults;
219 for(int t=0; t<mOptions->thread; t++){
220 preStats1.push_back(configs[t]->getPreStats1());
221 postStats1.push_back(configs[t]->getPostStats1());
222 preStats2.push_back(configs[t]->getPreStats2());
223 postStats2.push_back(configs[t]->getPostStats2());
224 filterResults.push_back(configs[t]->getFilterResult());
225 }
226 Stats* finalPreStats1 = Stats::merge(preStats1);
227 Stats* finalPostStats1 = Stats::merge(postStats1);
228 Stats* finalPreStats2 = Stats::merge(preStats2);
229 Stats* finalPostStats2 = Stats::merge(postStats2);
230 FilterResult* finalFilterResult = FilterResult::merge(filterResults);
231
232 cerr << "Read1 before filtering:"<<endl;
233 finalPreStats1->print();
234 cerr << endl;
235 cerr << "Read2 before filtering:"<<endl;
236 finalPreStats2->print();
237 cerr << endl;
238 if(!mOptions->merge.enabled) {
239 cerr << "Read1 after filtering:"<<endl;
240 finalPostStats1->print();
241 cerr << endl;
242 cerr << "Read2 after filtering:"<<endl;
243 finalPostStats2->print();
244 } else {
245 cerr << "Merged and filtered:"<<endl;
246 finalPostStats1->print();
247 }
248
249 cerr << endl;
250 cerr << "Filtering result:"<<endl;
251 finalFilterResult->print();
252
253 double dupRate = 0.0;
254 if(mOptions->duplicate.enabled) {
255 dupRate = mDuplicate->getDupRate();
256 cerr << endl;
257 cerr << "Duplication rate: " << dupRate * 100.0 << "%" << endl;
258 }
259
260 // insert size distribution
261 int peakInsertSize = getPeakInsertSize();
262 cerr << endl;
263 cerr << "Insert size peak (evaluated by paired-end reads): " << peakInsertSize << endl;
264
265 if(mOptions->merge.enabled) {
266 cerr << endl;
267 cerr << "Read pairs merged: " << finalFilterResult->mMergedPairs << endl;
268 if(finalPostStats1->getReads() > 0) {
269 double postMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPostStats1->getReads();
270 double preMergedPercent = 100.0 * finalFilterResult->mMergedPairs / finalPreStats1->getReads();
271 cerr << "% of original read pairs: " << preMergedPercent << "%" << endl;
272 cerr << "% in reads after filtering: " << postMergedPercent << "%" << endl;
273 }
274 cerr << endl;
275 }
276
277 // make JSON report
278 JsonReporter jr(mOptions);
279 jr.setDup(dupRate);
280 jr.setInsertHist(mInsertSizeHist, peakInsertSize);
281 jr.report(finalFilterResult, finalPreStats1, finalPostStats1, finalPreStats2, finalPostStats2);
282
283 // make HTML report
284 HtmlReporter hr(mOptions);
285 hr.setDup(dupRate);
286 hr.setInsertHist(mInsertSizeHist, peakInsertSize);
287 hr.report(finalFilterResult, finalPreStats1, finalPostStats1, finalPreStats2, finalPostStats2);
288
289 // clean up
290 for(int t=0; t<mOptions->thread; t++){
291 delete threads[t];
292 threads[t] = NULL;
293 delete configs[t];
294 configs[t] = NULL;
295 }
296
297 if(readerInterveleaved) {
298 delete readerInterveleaved;
299 } else {
300 delete readerLeft;
301 delete readerRight;
302 }
303
304 delete finalPreStats1;
305 delete finalPostStats1;
306 delete finalPreStats2;
307 delete finalPostStats2;
308 delete finalFilterResult;
309
310 delete[] threads;
311 delete[] configs;
312
313 if(leftWriterThread)
314 delete leftWriterThread;
315 if(rightWriterThread)
316 delete rightWriterThread;
317 if(unpairedLeftWriterThread)
318 delete unpairedLeftWriterThread;
319 if(unpairedRightWriterThread)
320 delete unpairedRightWriterThread;
321 if(mergedWriterThread)
322 delete mergedWriterThread;
323 if(failedWriterThread)
324 delete failedWriterThread;
325 if(overlappedWriterThread)
326 delete overlappedWriterThread;
327
328 if(!mOptions->split.enabled)
329 closeOutput();
330
331 return true;
332 }
333
getPeakInsertSize()334 int PairEndProcessor::getPeakInsertSize() {
335 int peak = 0;
336 long maxCount = -1;
337 for(int i=0; i<mOptions->insertSizeMax; i++) {
338 if(mInsertSizeHist[i] > maxCount) {
339 peak = i;
340 maxCount = mInsertSizeHist[i];
341 }
342 }
343 return peak;
344 }
345
recycleToPool1(int tid,Read * r)346 void PairEndProcessor::recycleToPool1(int tid, Read* r) {
347 // failed to recycle, then delete it
348 if(!mLeftReadPool->input(tid, r))
349 delete r;
350 }
351
recycleToPool2(int tid,Read * r)352 void PairEndProcessor::recycleToPool2(int tid, Read* r) {
353 // failed to recycle, then delete it
354 if(!mRightReadPool->input(tid, r))
355 delete r;
356 }
357
processPairEnd(ReadPack * leftPack,ReadPack * rightPack,ThreadConfig * config)358 bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config){
359 if(leftPack->count != rightPack->count) {
360 cerr << endl;
361 cerr << "WARNNIG: different read numbers of the " << mPackProcessedCounter << " pack" << endl;
362 cerr << "Read1 pack size: " << leftPack->count << endl;
363 cerr << "Read2 pack size: " << rightPack->count << endl;
364 cerr << endl;
365 }
366 int tid = config->getThreadId();
367
368 string* outstr1 = new string();
369 string* outstr2 = new string();
370 string* unpairedOut1 = new string();
371 string* unpairedOut2 = new string();
372 string* singleOutput = new string();
373 string* mergedOutput = new string();
374 string* failedOut = new string();
375 string* overlappedOut = new string();
376
377 int readPassed = 0;
378 int mergedCount = 0;
379 for(int p=0;p<leftPack->count && p<rightPack->count;p++){
380 Read* or1 = leftPack->data[p];
381 Read* or2 = rightPack->data[p];
382
383 int lowQualNum1 = 0;
384 int nBaseNum1 = 0;
385 int lowQualNum2 = 0;
386 int nBaseNum2 = 0;
387
388 // stats the original read before trimming
389 config->getPreStats1()->statRead(or1);
390 config->getPreStats2()->statRead(or2);
391
392 // handling the duplication profiling
393 bool dedupOut = false;
394 if(mDuplicate) {
395 bool isDup = mDuplicate->checkPair(or1, or2);
396 if(mOptions->duplicate.dedup && isDup)
397 dedupOut = true;
398 }
399
400 // filter by index
401 if(mOptions->indexFilter.enabled && mFilter->filterByIndex(or1, or2)) {
402 recycleToPool1(tid, or1);
403 or1 = NULL;
404 recycleToPool2(tid, or2);
405 or2 = NULL;
406 continue;
407 }
408
409 // fix MGI
410 if(mOptions->fixMGI) {
411 or1->fixMGI();
412 or2->fixMGI();
413 }
414 // umi processing
415 if(mOptions->umi.enabled)
416 mUmiProcessor->process(or1, or2);
417
418 // trim in head and tail, and apply quality cut in sliding window
419 int frontTrimmed1 = 0;
420 int frontTrimmed2 = 0;
421 Read* r1 = mFilter->trimAndCut(or1, mOptions->trim.front1, mOptions->trim.tail1, frontTrimmed1);
422 Read* r2 = mFilter->trimAndCut(or2, mOptions->trim.front2, mOptions->trim.tail2, frontTrimmed2);
423
424 if(r1 != NULL && r2!=NULL) {
425 if(mOptions->polyGTrim.enabled)
426 PolyX::trimPolyG(r1, r2, config->getFilterResult(), mOptions->polyGTrim.minLen);
427 }
428 bool isizeEvaluated = false;
429 if(r1 != NULL && r2!=NULL && (mOptions->adapter.enabled || mOptions->correction.enabled)){
430 OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, mOptions->overlapDiffPercentLimit/100.0);
431 // we only use thread 0 to evaluae ISIZE
432 if(config->getThreadId() == 0) {
433 statInsertSize(r1, r2, ov, frontTrimmed1, frontTrimmed2);
434 isizeEvaluated = true;
435 }
436 if(mOptions->correction.enabled) {
437 BaseCorrector::correctByOverlapAnalysis(r1, r2, config->getFilterResult(), ov);
438 }
439 if(mOptions->adapter.enabled) {
440 bool trimmed = AdapterTrimmer::trimByOverlapAnalysis(r1, r2, config->getFilterResult(), ov, frontTrimmed1, frontTrimmed2);
441 bool trimmed1 = trimmed;
442 bool trimmed2 = trimmed;
443 if(!trimmed){
444 if(mOptions->adapter.hasSeqR1)
445 trimmed1 = AdapterTrimmer::trimBySequence(r1, config->getFilterResult(), mOptions->adapter.sequence, false);
446 if(mOptions->adapter.hasSeqR2)
447 trimmed2 = AdapterTrimmer::trimBySequence(r2, config->getFilterResult(), mOptions->adapter.sequenceR2, true);
448 }
449 if(mOptions->adapter.hasFasta) {
450 AdapterTrimmer::trimByMultiSequences(r1, config->getFilterResult(), mOptions->adapter.seqsInFasta, false, !trimmed1);
451 AdapterTrimmer::trimByMultiSequences(r2, config->getFilterResult(), mOptions->adapter.seqsInFasta, true, !trimmed2);
452 }
453 }
454 }
455
456 if(r1 != NULL && r2!=NULL && mOverlappedWriter) {
457 OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, 0);
458 if(ov.overlapped) {
459 Read* overlappedRead = new Read(r1->mName, new string(r1->mSeq->substr(max(0,ov.offset)), ov.overlap_len), r1->mStrand, new string(r1->mQuality->substr(max(0,ov.offset)), ov.overlap_len));
460 overlappedRead->appendToString(overlappedOut);
461 recycleToPool1(tid, overlappedRead);
462 }
463 }
464
465 if(config->getThreadId() == 0 && !isizeEvaluated && r1 != NULL && r2!=NULL) {
466 OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, mOptions->overlapDiffPercentLimit/100.0);
467 statInsertSize(r1, r2, ov, frontTrimmed1, frontTrimmed2);
468 isizeEvaluated = true;
469 }
470
471 if(r1 != NULL && r2!=NULL) {
472 if(mOptions->polyXTrim.enabled)
473 PolyX::trimPolyX(r1, r2, config->getFilterResult(), mOptions->polyXTrim.minLen);
474 }
475
476 if(r1 != NULL && r2!=NULL) {
477 if( mOptions->trim.maxLen1 > 0 && mOptions->trim.maxLen1 < r1->length())
478 r1->resize(mOptions->trim.maxLen1);
479 if( mOptions->trim.maxLen2 > 0 && mOptions->trim.maxLen2 < r2->length())
480 r2->resize(mOptions->trim.maxLen2);
481 }
482
483 Read* merged = NULL;
484 // merging mode
485 bool mergeProcessed = false;
486 if(mOptions->merge.enabled && r1 && r2) {
487 OverlapResult ov = OverlapAnalysis::analyze(r1, r2, mOptions->overlapDiffLimit, mOptions->overlapRequire, mOptions->overlapDiffPercentLimit/100.0);
488 if(ov.overlapped) {
489 merged = OverlapAnalysis::merge(r1, r2, ov);
490 int result = mFilter->passFilter(merged);
491 config->addFilterResult(result, 2);
492 if(result == PASS_FILTER) {
493 merged->appendToString(mergedOutput);
494 config->getPostStats1()->statRead(merged);
495 readPassed++;
496 mergedCount++;
497 }
498 recycleToPool1(tid, merged);
499 mergeProcessed = true;
500 } else if(mOptions->merge.includeUnmerged){
501 int result1 = mFilter->passFilter(r1);
502 config->addFilterResult(result1, 1);
503 if(result1 == PASS_FILTER && !dedupOut) {
504 r1->appendToString(mergedOutput);
505 config->getPostStats1()->statRead(r1);
506 }
507
508 int result2 = mFilter->passFilter(r2);
509 config->addFilterResult(result2, 1);
510 if(result2 == PASS_FILTER && !dedupOut) {
511 r2->appendToString(mergedOutput);
512 config->getPostStats1()->statRead(r2);
513 }
514 if(result1 == PASS_FILTER && result2 == PASS_FILTER )
515 readPassed++;
516 mergeProcessed = true;
517 }
518 }
519
520 if(!mergeProcessed) {
521
522 int result1 = mFilter->passFilter(r1);
523 int result2 = mFilter->passFilter(r2);
524
525 config->addFilterResult(max(result1, result2), 2);
526
527 if(!dedupOut) {
528
529 if( r1 != NULL && result1 == PASS_FILTER && r2 != NULL && result2 == PASS_FILTER ) {
530
531 if(mOptions->outputToSTDOUT && !mOptions->merge.enabled) {
532 r1->appendToString(singleOutput);
533 r2->appendToString(singleOutput);
534 } else {
535 r1->appendToString(outstr1);
536 r2->appendToString(outstr2);
537 }
538
539 // stats the read after filtering
540 if(!mOptions->merge.enabled) {
541 config->getPostStats1()->statRead(r1);
542 config->getPostStats2()->statRead(r2);
543 }
544
545 readPassed++;
546 } else if( r1 != NULL && result1 == PASS_FILTER) {
547 if(mUnpairedLeftWriter) {
548 r1->appendToString(unpairedOut1);
549 if(mFailedWriter)
550 or2->appendToStringWithTag(failedOut, FAILED_TYPES[result2]);
551 } else {
552 if(mFailedWriter) {
553 or1->appendToStringWithTag(failedOut, "paired_read_is_failing");
554 or2->appendToStringWithTag(failedOut, FAILED_TYPES[result2]);
555 }
556 }
557 } else if( r2 != NULL && result2 == PASS_FILTER) {
558 if(mUnpairedRightWriter) {
559 r2->appendToString(unpairedOut2);
560 if(mFailedWriter)
561 or1->appendToStringWithTag(failedOut,FAILED_TYPES[result1]);
562 } else if(mUnpairedLeftWriter) {
563 r2->appendToString(unpairedOut1);
564 if(mFailedWriter)
565 or1->appendToStringWithTag(failedOut,FAILED_TYPES[result1]);
566 } else {
567 if(mFailedWriter) {
568 or1->appendToStringWithTag(failedOut, FAILED_TYPES[result1]);
569 or2->appendToStringWithTag(failedOut, "paired_read_is_failing");
570 }
571 }
572 }
573 }
574 }
575
576 // if no trimming applied, r1 should be identical to or1
577 if(r1 != or1 && r1 != NULL) {
578 recycleToPool1(tid, r1);
579 r1 = NULL;
580 }
581 // if no trimming applied, r1 should be identical to or1
582 if(r2 != or2 && r2 != NULL) {
583 recycleToPool2(tid, r2);
584 r2 = NULL;
585 }
586
587 if(or1) {
588 recycleToPool1(tid, or1);
589 or1 = NULL;
590 }
591 if(or2) {
592 recycleToPool2(tid, or2);
593 or2 = NULL;
594 }
595 }
596
597 if(mOptions->outputToSTDOUT) {
598 // STDOUT output
599 // if it's merging mode, write the merged reads to STDOUT
600 // otherwise write interleaved single output
601 if(mOptions->merge.enabled)
602 fwrite(mergedOutput->c_str(), 1, mergedOutput->length(), stdout);
603 else
604 fwrite(singleOutput->c_str(), 1, singleOutput->length(), stdout);
605 } else if(mOptions->split.enabled) {
606 // split output by each worker thread
607 if(!mOptions->out1.empty())
608 config->getWriter1()->writeString(outstr1);
609 if(!mOptions->out2.empty())
610 config->getWriter2()->writeString(outstr2);
611 }
612
613 if(mMergedWriter) {
614 // write merged data
615 mMergedWriter->input(tid, mergedOutput);
616 mergedOutput = NULL;
617 }
618
619 if(mFailedWriter) {
620 // write failed data
621 mFailedWriter->input(tid, failedOut);
622 failedOut = NULL;
623 }
624
625 if(mOverlappedWriter) {
626 // write failed data
627 mOverlappedWriter->input(tid, overlappedOut);
628 overlappedOut = NULL;
629 }
630
631 // normal output by left/right writer thread
632 if(mRightWriter && mLeftWriter) {
633 // write PE
634 mLeftWriter->input(tid, outstr1);
635 outstr1 = NULL;
636
637 mRightWriter->input(tid, outstr2);
638 outstr2 = NULL;
639 } else if(mLeftWriter) {
640 // write singleOutput
641 mLeftWriter->input(tid, singleOutput);
642 singleOutput = NULL;
643 }
644 // output unpaired reads
645 if(mUnpairedLeftWriter && mUnpairedRightWriter) {
646 // write PE
647 mUnpairedLeftWriter->input(tid, unpairedOut1);
648 unpairedOut1 = NULL;
649
650 mUnpairedRightWriter->input(tid, unpairedOut2);
651 unpairedOut2 = NULL;
652 } else if(mUnpairedLeftWriter) {
653 mUnpairedLeftWriter->input(tid, unpairedOut1);
654 unpairedOut1 = NULL;
655 }
656
657 if(mOptions->split.byFileLines)
658 config->markProcessed(readPassed);
659 else
660 config->markProcessed(leftPack->count);
661
662 if(mOptions->merge.enabled) {
663 config->addMergedPairs(mergedCount);
664 }
665
666 if(outstr1)
667 delete outstr1;
668 if(outstr2)
669 delete outstr2;
670 if(unpairedOut1)
671 delete unpairedOut1;
672 if(unpairedOut2)
673 delete unpairedOut2;
674 if(singleOutput)
675 delete singleOutput;
676 if(mergedOutput)
677 delete mergedOutput;
678 if(failedOut)
679 delete failedOut;
680 if(overlappedOut)
681 delete overlappedOut;
682
683 delete leftPack->data;
684 delete rightPack->data;
685 delete leftPack;
686 delete rightPack;
687
688 mPackProcessedCounter++;
689
690 return true;
691 }
692
statInsertSize(Read * r1,Read * r2,OverlapResult & ov,int frontTrimmed1,int frontTrimmed2)693 void PairEndProcessor::statInsertSize(Read* r1, Read* r2, OverlapResult& ov, int frontTrimmed1, int frontTrimmed2) {
694 int isize = mOptions->insertSizeMax;
695 if(ov.overlapped) {
696 if(ov.offset > 0)
697 isize = r1->length() + r2->length() - ov.overlap_len + frontTrimmed1 + frontTrimmed2;
698 else
699 isize = ov.overlap_len + frontTrimmed1 + frontTrimmed2;
700 }
701
702 if(isize > mOptions->insertSizeMax)
703 isize = mOptions->insertSizeMax;
704
705 mInsertSizeHist[isize]++;
706 }
707
readerTask(bool isLeft)708 void PairEndProcessor::readerTask(bool isLeft)
709 {
710 if(mOptions->verbose) {
711 if(isLeft)
712 loginfo("start to load data of read1");
713 else
714 loginfo("start to load data of read2");
715 }
716 long lastReported = 0;
717 int slept = 0;
718 long readNum = 0;
719 bool splitSizeReEvaluated = false;
720 Read** data = new Read*[PACK_SIZE];
721 memset(data, 0, sizeof(Read*)*PACK_SIZE);
722 FastqReader* reader = NULL;
723 if(isLeft) {
724 reader = new FastqReader(mOptions->in1, true, mOptions->phred64);
725 reader->setReadPool(mLeftReadPool);
726 }
727 else {
728 reader = new FastqReader(mOptions->in2, true, mOptions->phred64);
729 reader->setReadPool(mRightReadPool);
730 }
731
732 int count=0;
733 bool needToBreak = false;
734 while(true){
735 Read* read = reader->read();
736 if(!read || needToBreak){
737 // the last pack
738 ReadPack* pack = new ReadPack;
739 pack->data = data;
740 pack->count = count;
741
742 if(isLeft) {
743 mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(pack);
744 mLeftPackReadCounter++;
745 } else {
746 mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack);
747 mRightPackReadCounter++;
748 }
749 data = NULL;
750 if(read) {
751 delete read;
752 read = NULL;
753 }
754 break;
755 }
756 data[count] = read;
757 count++;
758 // configured to process only first N reads
759 if(mOptions->readsToProcess >0 && count + readNum >= mOptions->readsToProcess) {
760 needToBreak = true;
761 }
762 if(mOptions->verbose && count + readNum >= lastReported + 1000000) {
763 lastReported = count + readNum;
764 string msg;
765 if(isLeft)
766 msg = "Read1: ";
767 else
768 msg = "Read2: ";
769 msg += "loaded " + to_string((lastReported/1000000)) + "M reads";
770 loginfo(msg);
771 }
772 // a full pack
773 if(count == PACK_SIZE || needToBreak){
774 ReadPack* pack = new ReadPack;
775 pack->data = data;
776 pack->count = count;
777
778 if(isLeft) {
779 mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(pack);
780 mLeftPackReadCounter++;
781 } else {
782 mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(pack);
783 mRightPackReadCounter++;
784 }
785
786 //re-initialize data for next pack
787 data = new Read*[PACK_SIZE];
788 memset(data, 0, sizeof(Read*)*PACK_SIZE);
789 // if the processor is far behind this reader, sleep and wait to limit memory usage
790 if(isLeft) {
791 while(mLeftPackReadCounter - mPackProcessedCounter > PACK_IN_MEM_LIMIT){
792 //cerr<<"sleep"<<endl;
793 slept++;
794 usleep(100);
795 }
796 } else {
797 while(mRightPackReadCounter - mPackProcessedCounter > PACK_IN_MEM_LIMIT){
798 //cerr<<"sleep"<<endl;
799 slept++;
800 usleep(100);
801 }
802 }
803 readNum += count;
804 // if the writer threads are far behind this producer, sleep and wait
805 // check this only when necessary
806 if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) {
807 while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){
808 slept++;
809 usleep(1000);
810 }
811 }
812 // reset count to 0
813 count = 0;
814 // re-evaluate split size
815 // TODO: following codes are commented since it may cause threading related conflicts in some systems
816 /*if(mOptions->split.needEvaluation && !splitSizeReEvaluated && readNum >= mOptions->split.size) {
817 splitSizeReEvaluated = true;
818 // greater than the initial evaluation
819 if(readNum >= 1024*1024) {
820 size_t bytesRead;
821 size_t bytesTotal;
822 reader.getBytes(bytesRead, bytesTotal);
823 mOptions->split.size *= (double)bytesTotal / ((double)bytesRead * (double) mOptions->split.number);
824 if(mOptions->split.size <= 0)
825 mOptions->split.size = 1;
826 }
827 }*/
828 }
829 }
830
831 for(int t=0; t<mOptions->thread; t++) {
832 if(isLeft)
833 mLeftInputLists[t]->setProducerFinished();
834 else
835 mRightInputLists[t]->setProducerFinished();
836 }
837
838 if(mOptions->verbose) {
839 if(isLeft) {
840 mLeftReaderFinished = true;
841 loginfo("Read1: loading completed with " + to_string(mLeftPackReadCounter) + " packs");
842 }
843 else {
844 mRightReaderFinished = true;
845 loginfo("Read2: loading completed with " + to_string(mRightPackReadCounter) + " packs");
846 }
847 }
848
849
850 // if the last data initialized is not used, free it
851 if(data != NULL)
852 delete[] data;
853 if(reader != NULL)
854 delete reader;
855 }
856
interleavedReaderTask()857 void PairEndProcessor::interleavedReaderTask()
858 {
859 if(mOptions->verbose)
860 loginfo("start to load data");
861 long lastReported = 0;
862 int slept = 0;
863 long readNum = 0;
864 bool splitSizeReEvaluated = false;
865 Read** dataLeft = new Read*[PACK_SIZE];
866 Read** dataRight = new Read*[PACK_SIZE];
867 memset(dataLeft, 0, sizeof(Read*)*PACK_SIZE);
868 memset(dataRight, 0, sizeof(Read*)*PACK_SIZE);
869 FastqReaderPair reader(mOptions->in1, mOptions->in2, true, mOptions->phred64,true);
870 int count=0;
871 bool needToBreak = false;
872 while(true){
873 ReadPair* pair = reader.read();
874 // TODO: put needToBreak here is just a WAR for resolve some unidentified dead lock issue
875 if(!pair || needToBreak){
876 // the last pack
877 ReadPack* packLeft = new ReadPack;
878 ReadPack* packRight = new ReadPack;
879 packLeft->data = dataLeft;
880 packRight->data = dataRight;
881 packLeft->count = count;
882 packRight->count = count;
883
884 mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(packLeft);
885 mLeftPackReadCounter++;
886
887 mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight);
888 mRightPackReadCounter++;
889
890 dataLeft = NULL;
891 dataRight = NULL;
892 if(pair) {
893 delete pair;
894 pair = NULL;
895 }
896 break;
897 }
898 dataLeft[count] = pair->mLeft;
899 dataRight[count] = pair->mRight;
900 count++;
901 // configured to process only first N reads
902 if(mOptions->readsToProcess >0 && count + readNum >= mOptions->readsToProcess) {
903 needToBreak = true;
904 }
905 if(mOptions->verbose && count + readNum >= lastReported + 1000000) {
906 lastReported = count + readNum;
907 string msg = "loaded " + to_string((lastReported/1000000)) + "M read pairs";
908 loginfo(msg);
909 }
910 // a full pack
911 if(count == PACK_SIZE || needToBreak){
912 ReadPack* packLeft = new ReadPack;
913 ReadPack* packRight = new ReadPack;
914 packLeft->data = dataLeft;
915 packRight->data = dataRight;
916 packLeft->count = count;
917 packRight->count = count;
918
919 mLeftInputLists[mLeftPackReadCounter % mOptions->thread]->produce(packLeft);
920 mLeftPackReadCounter++;
921
922 mRightInputLists[mRightPackReadCounter % mOptions->thread]->produce(packRight);
923 mRightPackReadCounter++;
924
925 //re-initialize data for next pack
926 dataLeft = new Read*[PACK_SIZE];
927 dataRight = new Read*[PACK_SIZE];
928 memset(dataLeft, 0, sizeof(Read*)*PACK_SIZE);
929 memset(dataRight, 0, sizeof(Read*)*PACK_SIZE);
930 // if the consumer is far behind this producer, sleep and wait to limit memory usage
931 while(mLeftPackReadCounter - mPackProcessedCounter > PACK_IN_MEM_LIMIT){
932 //cerr<<"sleep"<<endl;
933 slept++;
934 usleep(100);
935 }
936 readNum += count;
937 // if the writer threads are far behind this producer, sleep and wait
938 // check this only when necessary
939 if(readNum % (PACK_SIZE * PACK_IN_MEM_LIMIT) == 0 && mLeftWriter) {
940 while( (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) || (mRightWriter && mRightWriter->bufferLength() > PACK_IN_MEM_LIMIT) ){
941 slept++;
942 usleep(1000);
943 }
944 }
945 // reset count to 0
946 count = 0;
947 // re-evaluate split size
948 // TODO: following codes are commented since it may cause threading related conflicts in some systems
949 /*if(mOptions->split.needEvaluation && !splitSizeReEvaluated && readNum >= mOptions->split.size) {
950 splitSizeReEvaluated = true;
951 // greater than the initial evaluation
952 if(readNum >= 1024*1024) {
953 size_t bytesRead;
954 size_t bytesTotal;
955 reader.mLeft->getBytes(bytesRead, bytesTotal);
956 mOptions->split.size *= (double)bytesTotal / ((double)bytesRead * (double) mOptions->split.number);
957 if(mOptions->split.size <= 0)
958 mOptions->split.size = 1;
959 }
960 }*/
961 }
962 }
963
964 for(int t=0; t<mOptions->thread; t++) {
965 mLeftInputLists[t]->setProducerFinished();
966 mRightInputLists[t]->setProducerFinished();
967 }
968
969 if(mOptions->verbose) {
970 loginfo("interleaved: loading completed with " + to_string(mLeftPackReadCounter) + " packs");
971 }
972
973 mLeftReaderFinished = true;
974 mRightReaderFinished = true;
975
976 // if the last data initialized is not used, free it
977 if(dataLeft != NULL)
978 delete[] dataLeft;
979 if(dataRight != NULL)
980 delete[] dataRight;
981 }
982
processorTask(ThreadConfig * config)983 void PairEndProcessor::processorTask(ThreadConfig* config)
984 {
985 SingleProducerSingleConsumerList<ReadPack*>* inputLeft = config->getLeftInput();
986 SingleProducerSingleConsumerList<ReadPack*>* inputRight = config->getRightInput();
987 while(true) {
988 if(config->canBeStopped()){
989 break;
990 }
991 while(inputLeft->canBeConsumed() && inputRight->canBeConsumed()) {
992 ReadPack* dataLeft = inputLeft->consume();
993 ReadPack* dataRight = inputRight->consume();
994 processPairEnd(dataLeft, dataRight, config);
995 }
996 if(inputLeft->isProducerFinished() && !inputLeft->canBeConsumed()) {
997 break;
998 } else if(inputRight->isProducerFinished() && !inputRight->canBeConsumed()) {
999 break;
1000 } else {
1001 usleep(100);
1002 }
1003 }
1004 inputLeft->setConsumerFinished();
1005 inputRight->setConsumerFinished();
1006
1007 mFinishedThreads++;
1008 if(mOptions->verbose) {
1009 string msg = "thread " + to_string(config->getThreadId() + 1) + " data processing completed";
1010 loginfo(msg);
1011 }
1012
1013 if(mFinishedThreads == mOptions->thread) {
1014 if(mLeftWriter)
1015 mLeftWriter->setInputCompleted();
1016 if(mRightWriter)
1017 mRightWriter->setInputCompleted();
1018 if(mUnpairedLeftWriter)
1019 mUnpairedLeftWriter->setInputCompleted();
1020 if(mUnpairedRightWriter)
1021 mUnpairedRightWriter->setInputCompleted();
1022 if(mMergedWriter)
1023 mMergedWriter->setInputCompleted();
1024 if(mFailedWriter)
1025 mFailedWriter->setInputCompleted();
1026 if(mOverlappedWriter)
1027 mOverlappedWriter->setInputCompleted();
1028 }
1029
1030 if(mOptions->verbose) {
1031 string msg = "thread " + to_string(config->getThreadId() + 1) + " finished";
1032 loginfo(msg);
1033 }
1034 }
1035
writerTask(WriterThread * config)1036 void PairEndProcessor::writerTask(WriterThread* config)
1037 {
1038 while(true) {
1039 if(config->isCompleted()){
1040 // last check for possible threading related issue
1041 config->output();
1042 break;
1043 }
1044 config->output();
1045 }
1046
1047 if(mOptions->verbose) {
1048 string msg = config->getFilename() + " writer finished";
1049 loginfo(msg);
1050 }
1051 }
1052