1 package var2; 2 3 import java.io.PrintStream; 4 import java.util.ArrayList; 5 import java.util.Locale; 6 7 import fileIO.ByteFile; 8 import fileIO.FileFormat; 9 import fileIO.ReadWrite; 10 import shared.Parse; 11 import shared.Parser; 12 import shared.PreParser; 13 import shared.ReadStats; 14 import shared.Shared; 15 import shared.Timer; 16 import shared.Tools; 17 import stream.ConcurrentReadInputStream; 18 import stream.ConcurrentReadOutputStream; 19 import stream.FastaReadInputStream; 20 import stream.Read; 21 import stream.SamLine; 22 import stream.SamReadStreamer; 23 import stream.SamStreamer; 24 import structures.ListNum; 25 26 /** 27 * Removes lines with unsupported variations from a sam file. 28 * 29 * @author Brian Bushnell 30 * @date January 25, 2018 31 * 32 */ 33 public class FilterSam { 34 35 /*--------------------------------------------------------------*/ 36 /*---------------- Initialization ----------------*/ 37 /*--------------------------------------------------------------*/ 38 39 /** 40 * Code entrance from the command line. 41 * @param args Command line arguments 42 */ main(String[] args)43 public static void main(String[] args){ 44 //Start a timer immediately upon code entrance. 45 Timer t=new Timer(); 46 47 //Create an instance of this class 48 FilterSam x=new FilterSam(args); 49 50 //Run the object 51 x.process(t); 52 53 //Close the print stream if it was redirected 54 Shared.closeStream(x.outstream); 55 } 56 57 /** 58 * Constructor. 59 * @param args Command line arguments 60 */ FilterSam(String[] args)61 public FilterSam(String[] args){ 62 63 {//Preparse block for help, config files, and outstream 64 PreParser pp=new PreParser(args, getClass(), false); 65 args=pp.args; 66 outstream=pp.outstream; 67 } 68 69 //Set shared static variables 70 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 71 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 72 SamLine.SET_FROM_OK=true; 73 Var.CALL_INS=false; 74 Var.CALL_DEL=false; 75 76 //Create a parser object 77 Parser parser=new Parser(); 78 79 //Parse each argument 80 for(int i=0; i<args.length; i++){ 81 String arg=args[i]; 82 83 //Break arguments into their constituent parts, in the form of "a=b" 84 String[] split=arg.split("="); 85 String a=split[0].toLowerCase(); 86 String b=split.length>1 ? split[1] : null; 87 88 if(a.equals("verbose")){ 89 verbose=Parse.parseBoolean(b); 90 }else if(a.equals("ordered")){ 91 ordered=Parse.parseBoolean(b); 92 }else if(a.equals("ss") || a.equals("streamer")){ 93 useStreamer=Parse.parseBoolean(b); 94 }else if(a.equals("ploidy")){ 95 ploidy=Integer.parseInt(b); 96 }else if(a.equals("prefilter")){ 97 prefilter=Parse.parseBoolean(b); 98 }else if(a.equals("ref")){ 99 ref=b; 100 }else if(a.equals("outbad") || a.equals("outb")){ 101 outBad=b; 102 }else if(a.equals("vars") || a.equals("variants") || a.equals("varfile") || a.equals("inv")){ 103 varFile=b; 104 }else if(a.equals("vcf") || a.equals("vcffile")){ 105 vcfFile=b; 106 }else if(a.equals("maxsubs") || a.equals("subfilter")){ 107 maxSubs=Integer.parseInt(b); 108 }else if(a.equals("maxvars") || a.equals("varfilter")){ 109 maxVars=Integer.parseInt(b); 110 }else if(a.equals("maxbadsubs") || a.equals("maxbadvars") || a.equals("mbv")){ 111 maxBadVars=Integer.parseInt(b); 112 }else if(a.equals("maxbadsubdepth") || a.equals("maxbadvardepth") || a.equals("maxbadsuballeledepth") 113 || a.equals("maxbadvaralleledepth") || a.equals("mbsad") || a.equals("mbvad") || a.equals("mbad")){ 114 maxBadAlleleDepth=Integer.parseInt(b); 115 }else if(a.equals("maxbadallelefraction") || a.equals("mbaf")){ 116 maxBadAlleleFraction=Float.parseFloat(b); 117 }else if(a.equals("minbadsubreaddepth") || a.equals("minbadreaddepth") || a.equals("mbsrd") || a.equals("mbrd")){ 118 minBadReadDepth=Integer.parseInt(b); 119 }else if(a.equals("sub") || a.equals("subs")){ 120 Var.CALL_SUB=Parse.parseBoolean(b); 121 }else if(a.equals("ins") || a.equals("inss")){ 122 Var.CALL_INS=Parse.parseBoolean(b); 123 }else if(a.equals("del") || a.equals("dels")){ 124 Var.CALL_DEL=Parse.parseBoolean(b); 125 }else if(a.equals("indel") || a.equals("indels")){ 126 Var.CALL_INS=Var.CALL_DEL=Parse.parseBoolean(b); 127 }else if(a.equals("minedist") || a.equals("minenddist") || a.equals("border")){ 128 minEDist=Integer.parseInt(b); 129 }else if(a.equals("parse_flag_goes_here")){ 130 long fake_variable=Parse.parseKMG(b); 131 //Set a variable here 132 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser 133 //do nothing 134 }else{ 135 outstream.println("Unknown parameter "+args[i]); 136 assert(false) : "Unknown parameter "+args[i]; 137 } 138 } 139 140 {//Process parser fields 141 Parser.processQuality(); 142 143 maxReads=parser.maxReads; 144 145 overwrite=ReadStats.overwrite=parser.overwrite; 146 append=ReadStats.append=parser.append; 147 148 in1=parser.in1; 149 150 outGood=parser.out1; 151 } 152 153 assert(FastaReadInputStream.settingsOK()); 154 155 //Ensure there is an input file 156 // if(in1==null){throw new RuntimeException("Error - an input file and a VCF file are required.");} 157 if(in1==null){throw new RuntimeException("Error - an input file is required.");} 158 159 //Adjust the number of threads for input file reading 160 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 161 ByteFile.FORCE_MODE_BF2=true; 162 } 163 164 //Ensure output files can be written 165 if(!Tools.testOutputFiles(overwrite, append, false, outGood, outBad)){ 166 outstream.println((outGood==null)+", "+(outBad==null)); 167 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+outGood+", "+outBad+"\n"); 168 } 169 170 //Ensure input files can be read 171 if(!Tools.testInputFiles(false, true, in1, vcfFile, varFile)){ 172 throw new RuntimeException("\nCan't read some input files.\n"); 173 } 174 175 //Ensure that no file was specified multiple times 176 if(!Tools.testForDuplicateFiles(true, in1, vcfFile, varFile, outBad, outGood)){ 177 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 178 } 179 180 //Create output FileFormat objects 181 ffoutGood=FileFormat.testOutput(outGood, FileFormat.SAM, null, true, overwrite, append, ordered); 182 ffoutBad=FileFormat.testOutput(outBad, FileFormat.SAM, null, true, overwrite, append, ordered); 183 184 //Create input FileFormat objects 185 ffin1=FileFormat.testInput(in1, FileFormat.SAM, null, true, true); 186 187 Timer t=new Timer(outstream, true); 188 { 189 t.start(); 190 outstream.print("Loading scaffolds: "); 191 if(ref==null){ 192 scafMap=ScafMap.loadSamHeader(in1); 193 }else{ 194 scafMap=ScafMap.loadReference(ref, true); 195 } 196 assert(scafMap!=null && scafMap.size()>0) : "No scaffold names were loaded."; 197 t.stop(pad(scafMap.size(), 12)+" \t"); 198 } 199 t.start(); 200 if(varFile!=null){ 201 outstream.print("Loading vars: "); 202 varMap=VcfLoader.loadVarFile(varFile, scafMap); 203 t.stop(pad(varMap.size(), 12)+" \t"); 204 }else if(vcfFile!=null){t.start(); 205 outstream.print("Loading vcf: "); 206 varMap=VcfLoader.loadVcfFile(vcfFile, scafMap, true, false); 207 t.stop(pad(varMap.size(), 12)+" \t"); 208 }else{ 209 outstream.print("Calling variants:\n"); 210 String inString="in="+in1; 211 212 // private int maxBadAlleleDepth=2; 213 // /** Maximum allele fraction for variants considered unsupported */ 214 // private float maxBadAlleleFraction=0.01f; 215 216 String[] cvargs=new String[] {inString, "ref="+ref, "clearfilters", "minreads="+(maxBadAlleleDepth), 217 "minallelefraction="+maxBadAlleleFraction, "printexecuting=f"}; 218 CallVariants cv=new CallVariants(cvargs); 219 cv.prefilter=prefilter; 220 cv.ploidy=ploidy; 221 222 varMap=cv.process(new Timer()); 223 scafMap=cv.scafMap; 224 t.stop(/*pad(varMap.size(), 12)+" \t"*/); 225 } 226 227 assert(Var.CALL_INS || Var.CALL_DEL || Var.CALL_SUB) : "Must enable at least one of subs, insertions, or deletions."; 228 subsOnly=!Var.CALL_INS && !Var.CALL_DEL; 229 } 230 231 /*--------------------------------------------------------------*/ 232 /*---------------- Outer Methods ----------------*/ 233 /*--------------------------------------------------------------*/ 234 235 /** Create read streams and process all data */ process(Timer t)236 void process(Timer t){ 237 238 //Turn off read validation in the input threads to increase speed 239 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; 240 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; 241 242 243 final SamReadStreamer ss; 244 //Create a read input stream 245 final ConcurrentReadInputStream cris; 246 if(useStreamer){ 247 cris=null; 248 ss=new SamReadStreamer(ffin1, streamerThreads, true, maxReads); 249 ss.start(); 250 if(verbose){outstream.println("Started streamer");} 251 }else{ 252 ss=null; 253 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, null); 254 cris.start(); //Start the stream 255 if(verbose){outstream.println("Started cris");} 256 } 257 258 //Optionally create a read output stream 259 final ConcurrentReadOutputStream ros, rosb; 260 if(ffoutGood!=null){ 261 //Select output buffer size based on whether it needs to be ordered 262 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 263 264 ros=ConcurrentReadOutputStream.getStream(ffoutGood, null, null, null, buff, null, true); 265 ros.start(); //Start the stream 266 }else{ros=null;} 267 if(ffoutBad!=null){ 268 //Select output buffer size based on whether it needs to be ordered 269 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 270 271 rosb=ConcurrentReadOutputStream.getStream(ffoutBad, null, null, null, buff, null, false); 272 rosb.start(); //Start the stream 273 }else{rosb=null;} 274 275 //Reset counters 276 readsProcessed=0; 277 basesProcessed=0; 278 279 //Process the reads in separate threads 280 spawnThreads(cris, ss, ros, rosb); 281 282 if(verbose){outstream.println("Finished; closing streams.");} 283 284 //Write anything that was accumulated by ReadStats 285 errorState|=ReadStats.writeAll(); 286 //Close the read streams 287 errorState|=ReadWrite.closeStreams(cris, ros, rosb); 288 289 //Reset read validation 290 Read.VALIDATE_IN_CONSTRUCTOR=vic; 291 292 //Report timing and results 293 { 294 t.stop(); 295 296 //Calculate units per nanosecond 297 double rpnano=readsProcessed/(double)(t.elapsed); 298 double bpnano=basesProcessed/(double)(t.elapsed); 299 300 final long rg=readsOut, rb=readsProcessed-readsOut; 301 final long bg=basesOut, bb=basesProcessed-basesOut; 302 303 //Add "k" and "m" for large numbers 304 // String rpstring=(readsProcessed<100000 ? ""+readsProcessed : readsProcessed<100000000 ? (readsProcessed/1000)+"k" : (readsProcessed/1000000)+"m"); 305 // String bpstring=(basesProcessed<100000 ? ""+basesProcessed : basesProcessed<100000000 ? (basesProcessed/1000)+"k" : (basesProcessed/1000000)+"m"); 306 // String rgstring=(rg<100000 ? ""+rg : rg<100000000 ? (rg/1000)+"k" : (rg/1000000)+"m"); 307 // String bgstring=(bg<100000 ? ""+bg : bg<100000000 ? (bg/1000)+"k" : (bg/1000000)+"m"); 308 // String rbstring=(rb<100000 ? ""+rb : rb<100000000 ? (rb/1000)+"k" : (rb/1000000)+"m"); 309 // String bbstring=(bb<100000 ? ""+bb : bb<100000000 ? (bb/1000)+"k" : (bb/1000000)+"m"); 310 311 final int len=12; 312 313 final String rpstring=pad(readsProcessed, len); 314 final String bpstring=pad(basesProcessed, len); 315 final String rgstring=pad(rg, len); 316 final String bgstring=pad(bg, len); 317 final String rbstring=pad(rb, len); 318 final String bbstring=pad(bb, len); 319 320 final double mappedReadsDiscarded=mappedReadsProcessed-mappedReadsRetained; 321 double avgQGood=(bqSumGood/(double)mappedReadsRetained); 322 double avgQBad=(bqSumBad/(double)mappedReadsDiscarded); 323 double avgMapQGood=(mapqSumGood/(double)mappedReadsRetained); 324 double avgMapQBad=(mapqSumBad/(double)mappedReadsDiscarded); 325 double avgVarsGood=(varSumGood/(double)mappedReadsRetained); 326 double avgVarsBad=(varSumBad/(double)mappedReadsDiscarded); 327 final String avgQGoodS=pad(String.format(Locale.ROOT, "%.2f", avgQGood), len); 328 final String avgQBadS=pad(String.format(Locale.ROOT, "%.2f", avgQBad), len); 329 final String avgMapQGoodS=pad(String.format(Locale.ROOT, "%.2f", avgMapQGood), len); 330 final String avgMapQBadS=pad(String.format(Locale.ROOT, "%.2f", avgMapQBad), len); 331 final String avgVarsGoodS=pad(String.format(Locale.ROOT, "%.2f", avgVarsGood), len); 332 final String avgVarsBadS=pad(String.format(Locale.ROOT, "%.2f", avgVarsBad), len); 333 334 outstream.println("Time: \t\t"+t); 335 outstream.println("Reads Processed: "+rpstring+" \t"+String.format(Locale.ROOT, "%.2fk reads/sec", rpnano*1000000)); 336 outstream.println("Bases Processed: "+bpstring+" \t"+String.format(Locale.ROOT, "%.2fm bases/sec", bpnano*1000)); 337 outstream.println(); 338 outstream.println("Reads Retained: "+rgstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (rg*100.0/readsProcessed))); 339 outstream.println("Bases Retained: "+bgstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (bg*100.0/basesProcessed))); 340 outstream.println("Avg. Qual Retained: "+avgQGoodS); 341 outstream.println("Avg. MapQ Retained: "+avgMapQGoodS); 342 outstream.println("Avg. Vars Retained: "+avgVarsGoodS); 343 outstream.println(); 344 outstream.println("Reads Discarded: "+rbstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (rb*100.0/readsProcessed))); 345 outstream.println("Bases Discarded: "+bbstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (bb*100.0/basesProcessed))); 346 outstream.println("Avg. Qual Discarded:"+avgQBadS); 347 outstream.println("Avg. MapQ Discarded:"+avgMapQBadS); 348 outstream.println("Avg. Vars Discarded:"+avgVarsBadS); 349 } 350 351 //Throw an exception of there was an error in a thread 352 if(errorState){ 353 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 354 } 355 } 356 357 private static String pad(long s, int len){ 358 return pad(""+s, len); 359 } 360 361 private static String pad(String s, int len){ 362 while(s.length()<len){s=" "+s;} 363 return s; 364 } 365 366 /** Spawn process threads */ 367 private void spawnThreads(final ConcurrentReadInputStream cris, final SamStreamer ss, final ConcurrentReadOutputStream ros, final ConcurrentReadOutputStream rosb){ 368 369 //Do anything necessary prior to processing 370 371 //Determine how many threads may be used 372 final int threads=Shared.threads(); 373 374 //Fill a list with ProcessThreads 375 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 376 for(int i=0; i<threads; i++){ 377 alpt.add(new ProcessThread(cris, ss, ros, rosb, i)); 378 } 379 380 //Start the threads 381 for(ProcessThread pt : alpt){ 382 pt.start(); 383 } 384 385 //Wait for completion of all threads 386 boolean success=true; 387 for(ProcessThread pt : alpt){ 388 389 //Wait until this thread has terminated 390 while(pt.getState()!=Thread.State.TERMINATED){ 391 try { 392 //Attempt a join operation 393 pt.join(); 394 } catch (InterruptedException e) { 395 //Potentially handle this, if it is expected to occur 396 e.printStackTrace(); 397 } 398 } 399 400 //Accumulate per-thread statistics 401 readsProcessed+=pt.readsProcessedT; 402 basesProcessed+=pt.basesProcessedT; 403 mappedReadsProcessed+=pt.mappedReadsProcessedT; 404 mappedBasesProcessed+=pt.mappedBasesProcessedT; 405 mappedReadsRetained+=pt.mappedReadsRetainedT; 406 mappedBasesRetained+=pt.mappedBasesRetainedT; 407 readsOut+=pt.readsOutT; 408 basesOut+=pt.basesOutT; 409 bqSumGood+=pt.qSumGoodT; 410 bqSumBad+=pt.qSumBadT; 411 // adSumGood+=pt.adSumGoodT; 412 // adSumBad+=pt.adSumBadT; 413 // rdSumGood+=pt.rdSumGoodT; 414 // rdSumBad+=pt.rdSumBadT; 415 varSumGood+=pt.varSumGoodT; 416 varSumBad+=pt.varSumBadT; 417 mapqSumGood+=pt.mapqSumGoodT; 418 mapqSumBad+=pt.mapqSumBadT; 419 success&=pt.success; 420 } 421 422 //Track whether any threads failed 423 if(!success){errorState=true;} 424 425 //Do anything necessary after processing 426 427 } 428 429 /*--------------------------------------------------------------*/ 430 /*---------------- Inner Methods ----------------*/ 431 /*--------------------------------------------------------------*/ 432 433 /*--------------------------------------------------------------*/ 434 /*---------------- Inner Classes ----------------*/ 435 /*--------------------------------------------------------------*/ 436 437 /** This class is static to prevent accidental writing to shared variables. 438 * It is safe to remove the static modifier. */ 439 private class ProcessThread extends Thread { 440 441 //Constructor 442 ProcessThread(final ConcurrentReadInputStream cris_, final SamStreamer ss_, final ConcurrentReadOutputStream ros_, final ConcurrentReadOutputStream rosb_, final int tid_){ 443 cris=cris_; 444 ss=ss_; 445 ros=ros_; 446 rosb=rosb_; 447 tid=tid_; 448 } 449 450 //Called by start() 451 @Override 452 public void run(){ 453 //Do anything necessary prior to processing 454 455 //Process the reads 456 if(useStreamer){ 457 processSS(); 458 }else{ 459 processCris(); 460 } 461 462 //Do anything necessary after processing 463 464 //Indicate successful exit status 465 success=true; 466 } 467 468 /** Iterate through the reads */ 469 void processCris(){ 470 471 //Grab the first ListNum of reads 472 ListNum<Read> ln=cris.nextList(); 473 //Grab the actual read list from the ListNum 474 ArrayList<Read> reads=(ln!=null ? ln.list : null); 475 476 //Check to ensure pairing is as expected 477 if(reads!=null && !reads.isEmpty()){ 478 Read r=reads.get(0); 479 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access 480 } 481 482 //As long as there is a nonempty read list... 483 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 484 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access 485 486 ArrayList<Read> good=new ArrayList<Read>(reads.size()); 487 ArrayList<Read> bad=new ArrayList<Read>(Tools.max(1, reads.size()/4)); 488 489 //Loop through each read in the list 490 for(int idx=0; idx<reads.size(); idx++){ 491 final Read r1=reads.get(idx); 492 493 //Validate reads in worker threads 494 if(!r1.validated()){r1.validate(true);} 495 496 //Track the initial length for statistics 497 final int initialLength1=r1.length(); 498 499 //Increment counters 500 readsProcessedT++; 501 basesProcessedT+=initialLength1; 502 503 { 504 //Reads are processed in this block. 505 boolean keep=processRead(r1); 506 if(keep){ 507 //Increment counters 508 readsOutT++; 509 basesOutT+=initialLength1; 510 good.add(r1); 511 }else{ 512 bad.add(r1); 513 } 514 } 515 } 516 517 //Output reads to the output stream 518 if(ros!=null){ros.add(good, ln.id);} 519 if(rosb!=null){rosb.add(bad, ln.id);} 520 521 //Notify the input stream that the list was used 522 cris.returnList(ln); 523 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access 524 525 //Fetch a new list 526 ln=cris.nextList(); 527 reads=(ln!=null ? ln.list : null); 528 } 529 530 //Notify the input stream that the final list was used 531 if(ln!=null){ 532 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 533 } 534 } 535 536 /** Iterate through the reads */ 537 void processSS(){ 538 539 //Grab the first ListNum of reads 540 ListNum<Read> ln=ss.nextList(); 541 //Grab the actual read list from the ListNum 542 ArrayList<Read> reads=(ln!=null ? ln.list : null); 543 544 //Check to ensure pairing is as expected 545 if(reads!=null && !reads.isEmpty()){ 546 Read r=reads.get(0); 547 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access 548 } 549 550 //As long as there is a nonempty read list... 551 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 552 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access 553 554 ArrayList<Read> good=new ArrayList<Read>(reads.size()); 555 ArrayList<Read> bad=new ArrayList<Read>(Tools.max(1, reads.size()/4)); 556 557 //Loop through each read in the list 558 for(int idx=0; idx<reads.size(); idx++){ 559 final Read r1=reads.get(idx); 560 561 //Validate reads in worker threads 562 if(!r1.validated()){r1.validate(true);} 563 564 //Track the initial length for statistics 565 final int initialLength1=r1.length(); 566 567 //Increment counters 568 readsProcessedT++; 569 basesProcessedT+=initialLength1; 570 571 { 572 //Reads are processed in this block. 573 boolean keep=processRead(r1); 574 if(keep){ 575 //Increment counters 576 readsOutT++; 577 basesOutT+=initialLength1; 578 good.add(r1); 579 }else{ 580 bad.add(r1); 581 } 582 } 583 } 584 585 //Output reads to the output stream 586 if(ros!=null){ros.add(good, ln.id);} 587 if(rosb!=null){rosb.add(bad, ln.id);} 588 589 //Fetch a new list 590 ln=ss.nextList(); 591 reads=(ln!=null ? ln.list : null); 592 } 593 594 //Notify the input stream that the final list was used 595 if(ln!=null){ 596 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 597 } 598 } 599 600 /** 601 * Process a read. 602 * @param r1 Read 1 603 * @return True if the read should be kept, false if it should be discarded. 604 */ 605 boolean processRead(final Read r1){ 606 return passesVariantFilter(r1); 607 } 608 609 private final boolean passesVariantFilter(Read r){ 610 if(!r.mapped() || r.bases==null || r.samline==null || r.match==null){return true;} 611 final int vars=(subsOnly ? Read.countSubs(r.match) : Read.countVars(r.match, Var.CALL_SUB, Var.CALL_INS, Var.CALL_DEL)); 612 final int len=r.length(); 613 final double q=r.avgQualityByProbabilityDouble(false, r.length()); 614 final SamLine sl=r.samline; 615 mappedReadsProcessedT++; 616 mappedBasesProcessedT+=len; 617 618 if(vars==0){ 619 varSumGoodT+=vars; 620 qSumGoodT+=q; 621 mapqSumGoodT+=sl.mapq; 622 mappedReadsRetainedT++; 623 mappedBasesRetainedT+=len; 624 return true; 625 } 626 627 if((maxVars>=0 && vars>maxVars) || 628 (maxSubs>=0 && vars>maxSubs && ((subsOnly ? vars : Read.countSubs(r.match))>maxSubs))){ 629 varSumBadT+=vars; 630 qSumBadT+=q; 631 mapqSumBadT+=sl.mapq; 632 return false; 633 } 634 635 if(vars<=maxBadVars){ 636 varSumGoodT+=vars; 637 qSumGoodT+=q; 638 mapqSumGoodT+=sl.mapq; 639 mappedReadsRetainedT++; 640 mappedBasesRetainedT+=len; 641 return true; 642 } 643 644 final ArrayList<Var> list; 645 if(subsOnly){ 646 list=CallVariants.findUniqueSubs(r, sl, varMap, scafMap, maxBadAlleleDepth, maxBadAlleleFraction, minBadReadDepth, minEDist); 647 }else{ 648 list=CallVariants.findUniqueVars(r, sl, varMap, scafMap, maxBadAlleleDepth, maxBadAlleleFraction, minBadReadDepth, minEDist); 649 } 650 if(list==null || list.size()<=maxBadVars){ 651 varSumGoodT+=vars; 652 qSumGoodT+=q; 653 mapqSumGoodT+=sl.mapq; 654 mappedReadsRetainedT++; 655 mappedBasesRetainedT+=len; 656 return true; 657 }else{ 658 assert(list.size()>maxBadVars) : list.size()+", "+maxBadVars; 659 for(Var v : list){ 660 assert(v!=null) : sl.cigar+"\n"+vars+"\n"+v+"\n"+list; 661 assert((v.type()==Var.SUB && Var.CALL_SUB) || (v.type()==Var.INS && Var.CALL_INS) || (v.type()==Var.DEL && Var.CALL_DEL)) : 662 list.size()+", "+maxBadVars+"\n"+sl.cigar+"\n"+v+"\n"+list; 663 } 664 varSumBadT+=vars; 665 qSumBadT+=q; 666 mapqSumBadT+=sl.mapq; 667 return false; 668 } 669 } 670 671 /** Number of reads processed by this thread */ 672 protected long readsProcessedT=0; 673 /** Number of reads processed by this thread */ 674 protected long basesProcessedT=0; 675 /** Number of mapped reads processed by this thread */ 676 protected long mappedReadsProcessedT=0; 677 /** Number of mapped bases processed by this thread */ 678 protected long mappedBasesProcessedT=0; 679 /** Number of mapped reads retained by this thread */ 680 protected long mappedReadsRetainedT=0; 681 /** Number of mapped bases retained by this thread */ 682 protected long mappedBasesRetainedT=0; 683 /** Number of good reads processed by this thread */ 684 protected long readsOutT=0; 685 /** Number of good bases processed by this thread */ 686 protected long basesOutT=0; 687 protected double qSumGoodT=0; 688 protected double qSumBadT=0; 689 protected long varSumGoodT=0; 690 protected long varSumBadT=0; 691 protected long mapqSumGoodT=0; 692 protected long mapqSumBadT=0; 693 694 /** True only if this thread has completed successfully */ 695 boolean success=false; 696 697 /** Shared input stream */ 698 private final ConcurrentReadInputStream cris; 699 /** Shared input stream */ 700 private final SamStreamer ss; 701 /** Good output stream */ 702 private final ConcurrentReadOutputStream ros; 703 /** Bad output stream */ 704 private final ConcurrentReadOutputStream rosb; 705 /** Thread ID */ 706 final int tid; 707 } 708 709 /*--------------------------------------------------------------*/ 710 /*---------------- Fields ----------------*/ 711 /*--------------------------------------------------------------*/ 712 713 /** Primary input file path */ 714 private String in1=null; 715 716 /** Optional reference */ 717 private String ref=null; 718 719 /** Good output file path */ 720 private String outGood=null; 721 /** Bad output file path */ 722 private String outBad=null; 723 724 /** VCF file path */ 725 private String varFile=null; 726 /** Variant file path */ 727 private String vcfFile=null; 728 private VarMap varMap=null; 729 private ScafMap scafMap=null; 730 731 /*--------------------------------------------------------------*/ 732 733 /** Maximum allowed substitutions in a read */ 734 private int maxSubs=-1; 735 /** Maximum allowed variations in a read */ 736 private int maxVars=-1; 737 738 /** Maximum allowed unsupported substitutions in a read */ 739 private int maxBadVars=1; 740 /** Maximum variant depth for a variant to be considered unsupported */ 741 private int maxBadAlleleDepth=2; 742 /** Maximum allele fraction for variants considered unsupported */ 743 private float maxBadAlleleFraction=0.01f; 744 /** Minimum read depth for a variant to be considered unsupported */ 745 private int minBadReadDepth=2; 746 /** Ignore vars within this distance of the ends */ 747 private int minEDist=5; 748 private int ploidy=1; 749 private boolean prefilter=false; 750 751 /** Number of reads processed */ 752 protected long readsProcessed=0; 753 /** Number of bases processed */ 754 protected long basesProcessed=0; 755 /** Number of mapped reads processed by this thread */ 756 protected long mappedReadsProcessed=0; 757 /** Number of mapped bases processed by this thread */ 758 protected long mappedBasesProcessed=0; 759 /** Number of mapped reads retained by this thread */ 760 protected long mappedReadsRetained=0; 761 /** Number of mapped bases retained by this thread */ 762 protected long mappedBasesRetained=0; 763 /** Number of good reads processed */ 764 protected long readsOut=0; 765 /** Number of good bases processed */ 766 protected long basesOut=0; 767 768 protected double bqSumGood=0; 769 protected double bqSumBad=0; 770 // protected double adSumGood=0; 771 // protected double adSumBad=0; 772 // protected double rdSumGood=0; 773 // protected double rdSumBad=0; 774 775 protected long varSumGood=0; 776 protected long varSumBad=0; 777 protected long mapqSumGood=0; 778 protected long mapqSumBad=0; 779 780 /** Quit after processing this many input reads; -1 means no limit */ 781 private long maxReads=-1; 782 783 static boolean useStreamer=true; 784 static int streamerThreads=3; 785 786 /*--------------------------------------------------------------*/ 787 /*---------------- Final Fields ----------------*/ 788 /*--------------------------------------------------------------*/ 789 790 /** Primary input file */ 791 private final FileFormat ffin1; 792 793 /** Primary output file */ 794 private final FileFormat ffoutGood; 795 /** Secondary output file */ 796 private final FileFormat ffoutBad; 797 798 private final boolean subsOnly; 799 800 /*--------------------------------------------------------------*/ 801 /*---------------- Common Fields ----------------*/ 802 /*--------------------------------------------------------------*/ 803 804 /** Print status messages to this output stream */ 805 private PrintStream outstream=System.err; 806 /** Print verbose messages */ 807 public static boolean verbose=false; 808 /** True if an error was encountered */ 809 public boolean errorState=false; 810 /** Overwrite existing output files */ 811 private boolean overwrite=false; 812 /** Append to existing output files */ 813 private boolean append=false; 814 /** Reads are output in input order */ 815 private boolean ordered=true; 816 817 } 818