1 package consensus; 2 3 import java.io.PrintStream; 4 import java.util.ArrayList; 5 import java.util.LinkedHashMap; 6 import java.util.Map.Entry; 7 8 import fileIO.ByteFile; 9 import fileIO.ByteStreamWriter; 10 import fileIO.FileFormat; 11 import fileIO.ReadWrite; 12 import prok.ProkObject; 13 import shared.Parse; 14 import shared.Parser; 15 import shared.PreParser; 16 import shared.ReadStats; 17 import shared.Shared; 18 import shared.Timer; 19 import shared.Tools; 20 import sketch.SketchObject; 21 import stream.ConcurrentReadInputStream; 22 import stream.ConcurrentReadOutputStream; 23 import stream.FASTQ; 24 import stream.FastaReadInputStream; 25 import stream.Read; 26 import stream.SamLine; 27 import stream.SamReadStreamer; 28 import stream.SamStreamer; 29 import structures.ByteBuilder; 30 import structures.ListNum; 31 import template.Accumulator; 32 import template.ThreadWaiter; 33 import var2.Realigner; 34 import var2.SamFilter; 35 36 /** 37 * Alters a reference to represent the consensus of aligned reads. 38 * 39 * @author Brian Bushnell 40 * @date September 6, 1019 41 * 42 */ 43 public class ConsensusMaker extends ConsensusObject implements Accumulator<ConsensusMaker.ProcessThread> { 44 45 /*--------------------------------------------------------------*/ 46 /*---------------- Initialization ----------------*/ 47 /*--------------------------------------------------------------*/ 48 49 /** 50 * Code entrance from the command line. 51 * @param args Command line arguments 52 */ main(String[] args)53 public static void main(String[] args){ 54 //Start a timer immediately upon code entrance. 55 Timer t=new Timer(); 56 57 //Create an instance of this class 58 ConsensusMaker x=new ConsensusMaker(args); 59 60 //Run the object 61 x.process(t); 62 63 //Close the print stream if it was redirected 64 Shared.closeStream(x.outstream); 65 } 66 67 /** 68 * Constructor. 69 * @param args Command line arguments 70 */ ConsensusMaker(String[] args)71 public ConsensusMaker(String[] args){ 72 73 {//Preparse block for help, config files, and outstream 74 PreParser pp=new PreParser(args, getClass(), false); 75 args=pp.args; 76 outstream=pp.outstream; 77 } 78 79 //Set shared static variables prior to parsing 80 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 81 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 82 FASTQ.TEST_INTERLEAVED=FASTQ.FORCE_INTERLEAVED=false; 83 84 samFilter.includeUnmapped=false; 85 // samFilter.includeSupplimentary=false; 86 // samFilter.includeDuplicate=false; 87 samFilter.includeNonPrimary=false; 88 samFilter.includeQfail=false; 89 // samFilter.minMapq=4; 90 91 {//Parse the arguments 92 final Parser parser=parse(args); 93 94 Parser.processQuality(); 95 96 maxReads=parser.maxReads; 97 overwrite=ReadStats.overwrite=parser.overwrite; 98 append=ReadStats.append=parser.append; 99 100 in=parser.in1; 101 extin=parser.extin; 102 103 out=parser.out1; 104 extout=parser.extout; 105 silent=Parser.silent; 106 } 107 108 { 109 // if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);} 110 // else{Scaffold.setCA3A(Parse.parseBoolean(atomic));} 111 112 if(ploidy<1){System.err.println("WARNING: ploidy not set; assuming ploidy=1."); ploidy=1;} 113 samFilter.setSamtoolsFilter(); 114 115 streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads())); 116 assert(streamerThreads>0) : streamerThreads; 117 } 118 119 validateParams(); 120 fixExtensions(); //Add or remove .gz or .bz2 as needed 121 checkFileExistence(); //Ensure files can be read and written 122 checkStatics(); //Adjust file-related static fields as needed for this program 123 124 //Create output FileFormat objects 125 ffout=FileFormat.testOutput(out, FileFormat.FASTA, extout, true, overwrite, append, ordered); 126 ffmodel=FileFormat.testOutput(outModel, FileFormat.ALM, null, true, overwrite, false, ordered); 127 128 //Create input FileFormat objects 129 ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true); 130 ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true); 131 132 if(inModelFile!=null){ 133 ArrayList<BaseGraph> list=(ArrayList<BaseGraph>)ReadWrite.readObject(inModelFile, false); 134 inModel=list.get(0); 135 inModel.calcProbs(); 136 inModel.makeWeights(); 137 } 138 } 139 140 /*--------------------------------------------------------------*/ 141 /*---------------- Initialization Helpers ----------------*/ 142 /*--------------------------------------------------------------*/ 143 144 /** Parse arguments from the command line */ parse(String[] args)145 private Parser parse(String[] args){ 146 147 //Create a parser object 148 Parser parser=new Parser(); 149 150 //Set any necessary Parser defaults here 151 //parser.foo=bar; 152 153 //Parse each argument 154 for(int i=0; i<args.length; i++){ 155 String arg=args[i]; 156 157 //Break arguments into their constituent parts, in the form of "a=b" 158 String[] split=arg.split("="); 159 String a=split[0].toLowerCase(); 160 String b=split.length>1 ? split[1] : null; 161 if(b!=null && b.equalsIgnoreCase("null")){b=null;} 162 163 if(a.equals("verbose")){ 164 verbose=Parse.parseBoolean(b); 165 BaseGraph.verbose=verbose; 166 }else if(a.equals("ref")){ 167 ref=b; 168 }else if(a.equals("outm") || a.equals("outmodel") || a.equals("model") || a.equals("alm")){ 169 outModel=b; 170 }else if(a.equals("inm") || a.equals("inmodel")){ 171 inModelFile=b; 172 }else if(a.equals("hist") || a.equals("histogram")){ 173 outHistFile=b; 174 }else if(a.equals("realign")){ 175 realign=Parse.parseBoolean(b); 176 }else if(a.equals("printscores")){ 177 printScores=Parse.parseBoolean(b); 178 }else if(a.equalsIgnoreCase("useMapq")){ 179 useMapq=Parse.parseBoolean(b); 180 }else if(a.equalsIgnoreCase("identityCeiling") || a.equalsIgnoreCase("idceiling") || a.equalsIgnoreCase("ceiling")){ 181 double d=Double.parseDouble(b); 182 if(d<=2){d=d*100;} 183 identityCeiling=(int)d; 184 invertIdentity=true; 185 }else if(a.equalsIgnoreCase("invertIdentity")){ 186 invertIdentity=Parse.parseBoolean(b); 187 }else if(a.equals("name") || a.equals("rename") || a.equals("header")){ 188 name=b; 189 }else if(a.equals("noindels")){ 190 noIndels=Parse.parseBoolean(b); 191 }else if(a.equalsIgnoreCase("onlyConvertNs") || a.equalsIgnoreCase("nOnly") || a.equalsIgnoreCase("onlyN")){ 192 onlyConvertNs=Parse.parseBoolean(b); 193 }else if(a.equals("ploidy")){ 194 ploidy=Integer.parseInt(b); 195 }else if(a.equals("mindepth")){ 196 minDepth=Integer.parseInt(b); 197 }else if(a.equals("trimdepth") || a.equals("trimdepthfraction")){ 198 trimDepthFraction=Float.parseFloat(b); 199 }else if(a.equalsIgnoreCase("trimNs")){ 200 trimNs=Parse.parseBoolean(b); 201 }else if(a.equalsIgnoreCase("mafn") || a.equals("mafnoref")){ 202 MAF_noref=Float.parseFloat(b); 203 }else if(a.equalsIgnoreCase("mafsub")){ 204 MAF_sub=Float.parseFloat(b); 205 }else if(a.equalsIgnoreCase("mafdel")){ 206 MAF_del=Float.parseFloat(b); 207 }else if(a.equalsIgnoreCase("mafins")){ 208 MAF_ins=Float.parseFloat(b); 209 }else if(a.equalsIgnoreCase("maf")){ 210 MAF_ins=MAF_noref=MAF_del=MAF_sub=Float.parseFloat(b); 211 }else if(a.equalsIgnoreCase("mafindel")){ 212 MAF_ins=MAF_del=Float.parseFloat(b); 213 }else if(a.equals("clearfilters")){ 214 if(Parse.parseBoolean(b)){ 215 samFilter.clear(); 216 } 217 }else if(a.equals("parse_flag_goes_here")){ 218 long fake_variable=Parse.parseKMG(b); 219 //Set a variable here 220 }else if(samFilter.parse(arg, a, b)){ 221 //do nothing 222 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser 223 //do nothing 224 }else{ 225 outstream.println("Unknown parameter "+args[i]); 226 assert(false) : "Unknown parameter "+args[i]; 227 } 228 } 229 230 if(ref!=null && !Tools.existsInput(ref)){ 231 specialRef=ProkObject.isSpecialType(ref); 232 } 233 234 return parser; 235 } 236 237 /** Add or remove .gz or .bz2 as needed */ fixExtensions()238 private void fixExtensions(){ 239 in=Tools.fixExtension(in); 240 if(!specialRef){ref=Tools.fixExtension(ref);} 241 } 242 243 /** Ensure files can be read and written */ checkFileExistence()244 private void checkFileExistence(){ 245 246 //Ensure there is an input file 247 if(in==null){throw new RuntimeException("Error - an input file is required.");} 248 249 //Ensure there is an input file 250 if(ref==null){throw new RuntimeException("Error - a reference file is required.");} 251 252 //Ensure output files can be written 253 if(!Tools.testOutputFiles(overwrite, append, false, out, outModel, outHistFile)){ 254 outstream.println((out==null)+", "+out); 255 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+" or "+outModel+"\n"); 256 } 257 258 //Ensure input files can be read 259 if(!Tools.testInputFiles(true, true, in, inModelFile, (specialRef ? null : ref))){ 260 throw new RuntimeException("\nCan't read some input files.\n"); 261 } 262 263 //Ensure that no file was specified multiple times 264 if(!Tools.testForDuplicateFiles(true, in, ref, out, outModel, outHistFile, inModelFile)){ 265 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 266 } 267 } 268 269 /** Adjust file-related static fields as needed for this program */ checkStatics()270 private static void checkStatics(){ 271 //Adjust the number of threads for input file reading 272 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 273 ByteFile.FORCE_MODE_BF2=true; 274 } 275 276 assert(FastaReadInputStream.settingsOK()); 277 } 278 279 /** Ensure parameter ranges are within bounds and required parameters are set */ validateParams()280 private boolean validateParams(){ 281 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo; 282 return true; 283 } 284 writeHist(String fname)285 private void writeHist(String fname){ 286 ByteStreamWriter bsw=new ByteStreamWriter(fname, overwrite, append, false); 287 bsw.start(); 288 bsw.print("#Value\tIdentity"); 289 if(inModel!=null){ 290 bsw.print("\tScore"); 291 } 292 bsw.nl(); 293 for(int i=0; i<idHist.length; i++){ 294 bsw.print(0.01f*i, 2).tab().print(idHist[i]); 295 if(inModel!=null){ 296 bsw.tab().print(scoreHist[i]); 297 } 298 bsw.nl(); 299 } 300 errorState=bsw.poisonAndWait()|errorState; 301 } 302 303 /*--------------------------------------------------------------*/ 304 /*---------------- Outer Methods ----------------*/ 305 /*--------------------------------------------------------------*/ 306 307 /** Create read streams and process all data */ process(Timer t)308 void process(Timer t){ 309 310 //Turn off read validation in the input threads to increase speed 311 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; 312 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; 313 314 //Load reference; 315 refMap=loadReferenceCustom(); 316 makeRefMap2(); 317 318 //Reset counters 319 readsProcessed=readsOut=0; 320 basesProcessed=basesOut=0; 321 alignedReads=0; 322 323 if(ffin.samOrBam()){ 324 //Create a read input stream 325 final SamStreamer ss=makeStreamer(ffin); 326 //Process the reads in separate threads 327 spawnThreads(ss); 328 }else{ 329 Shared.capBufferLen(40); 330 //Create a read input stream 331 final ConcurrentReadInputStream cris=makeCris(ffin); 332 //Process the reads in separate threads 333 spawnThreads(cris); 334 } 335 336 //Optionally create a read output stream 337 final ConcurrentReadOutputStream ros=makeCros(); 338 339 outputConsensus(ros); 340 341 if(outHistFile!=null){ 342 writeHist(outHistFile); 343 } 344 345 if(verbose){outstream.println("Finished; closing streams.");} 346 347 //Write anything that was accumulated by ReadStats 348 errorState|=ReadStats.writeAll(); 349 //Close the read streams 350 errorState|=ReadWrite.closeStream(ros); 351 352 //Reset read validation 353 Read.VALIDATE_IN_CONSTRUCTOR=vic; 354 355 //Report timing and results 356 t.stop(); 357 if(!silent){ 358 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 359 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); 360 outstream.println(); 361 } 362 outstream.println(Tools.number("Ref Count:", refCount, 8)); 363 outstream.println(Tools.number("Sub Count:", subCount, 8)); 364 outstream.println(Tools.number("Del Count:", delCount, 8)); 365 outstream.println(Tools.number("Ins Count:", insCount, 8)); 366 outstream.println(Tools.number("Avg Identity:", 100*identitySum/alignedReads, 3, 8)); 367 if(scoreSum>0){ 368 outstream.println(Tools.number("Avg Score:", 100*scoreSum/alignedReads, 3, 8)); 369 } 370 371 //Throw an exception of there was an error in a thread 372 if(errorState){ 373 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 374 } 375 } 376 377 private synchronized LinkedHashMap<String, BaseGraph> loadReferenceCustom(){ 378 assert(!loadedRef); 379 if(specialRef){return loadReferenceSpecial();} 380 ConcurrentReadInputStream cris=makeRefCris(); 381 LinkedHashMap<String, BaseGraph> map=new LinkedHashMap<String, BaseGraph>(); 382 ListNum<Read> ln=cris.nextList(); 383 for(; ln!=null && !ln.isEmpty(); ln=cris.nextList()){ 384 if(verbose){outstream.println("Fetched "+ln.size()+" reference sequences.");} 385 for(Read r : ln){ 386 if(r.bases!=null && r.bases.length>0){ 387 BaseGraph bg=new BaseGraph(r.id, r.bases, r.quality, r.numericID, 0); 388 map.put(r.name(), bg); 389 // if(inModel!=null && inModel.name.equals(bg.name)){ 390 //// assert(false) : Arrays.toString(bg.refWeights)+"\n"+Arrays.toString(inModel.refWeights); 391 // bg.refWeights=inModel.refWeights; 392 // bg.insWeights=inModel.insWeights; 393 // bg.delWeights=inModel.delWeights; 394 // }else{ 395 // bg.insWeights=new float[bg.ref.length]; 396 // bg.delWeights=new float[bg.ref.length]; 397 // Arrays.fill(bg.insWeights, 1); 398 // Arrays.fill(bg.delWeights, 1); 399 //// assert(false) : Arrays.toString(bg.delWeights); 400 // } 401 } 402 } 403 if(ln!=null){ 404 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 405 } 406 } 407 if(verbose){outstream.println("Loaded "+map.size()+" reference sequences.");} 408 loadedRef=true; 409 return map; 410 } 411 412 //For ribo subunits in resources directory 413 private synchronized LinkedHashMap<String, BaseGraph> loadReferenceSpecial(){ 414 assert(!loadedRef); 415 Read[] array=ProkObject.loadConsensusSequenceType(ref, false, false); 416 Read r=array[0]; 417 BaseGraph bg=new BaseGraph(r.id, r.bases, r.quality, r.numericID, 0); 418 LinkedHashMap<String, BaseGraph> map=new LinkedHashMap<String, BaseGraph>(); 419 map.put(r.name(), bg); 420 return map; 421 } 422 423 private synchronized void makeRefMap2(){ 424 assert(refMap!=null && refMap2==null); 425 if(verbose){outstream.println("Making refMap2.");} 426 refMap2=new LinkedHashMap<String, BaseGraph>((refMap.size()*3)/2); 427 for(Entry<String, BaseGraph> e : refMap.entrySet()){ 428 String key=e.getKey(); 429 if(verbose){outstream.println("Considering "+key);} 430 BaseGraph value=e.getValue(); 431 String key2=Tools.trimToWhitespace(key); 432 if(verbose){outstream.println("key2="+key2);} 433 // if(verbose){outstream.println("put "+key2+", "+value);} 434 refMap2.put(key2, value); 435 // if(verbose){outstream.println("putted "+key2+", "+value);} 436 437 if(defaultRname==null){defaultRname=key;} 438 } 439 // assert(false) : refMap+"\n"+refMap2; 440 if(verbose){outstream.println("Made refMap2.");} 441 } 442 443 private ConcurrentReadInputStream makeRefCris(){ 444 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null); 445 cris.start(); //Start the stream 446 if(verbose){outstream.println("Started cris");} 447 boolean paired=cris.paired(); 448 assert(!paired) : "References should not be paired."; 449 return cris; 450 } 451 452 private SamStreamer makeStreamer(FileFormat ff){ 453 if(ff==null){return null;} 454 SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads); 455 ss.start(); //Start the stream 456 if(verbose){outstream.println("Started Streamer");} 457 return ss; 458 } 459 460 private ConcurrentReadInputStream makeCris(FileFormat ff){ 461 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null); 462 cris.start(); //Start the stream 463 if(verbose){outstream.println("Started cris");} 464 boolean paired=cris.paired(); 465 assert(!paired); 466 return cris; 467 } 468 469 private ConcurrentReadOutputStream makeCros(){ 470 if(ffout==null){return null;} 471 472 //Select output buffer size based on whether it needs to be ordered 473 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 474 475 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false); 476 ros.start(); //Start the stream 477 return ros; 478 } 479 480 /*--------------------------------------------------------------*/ 481 /*---------------- Thread Management ----------------*/ 482 /*--------------------------------------------------------------*/ 483 484 /** Spawn process threads */ 485 private void spawnThreads(final SamStreamer ss){ 486 487 //Do anything necessary prior to processing 488 489 //Determine how many threads may be used 490 final int threads=Shared.threads(); 491 492 //Fill a list with ProcessThreads 493 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 494 for(int i=0; i<threads; i++){ 495 alpt.add(new ProcessThread(ss, i)); 496 } 497 498 //Start the threads 499 for(ProcessThread pt : alpt){ 500 pt.start(); 501 } 502 if(verbose){outstream.println("Started worker threads.");} 503 504 //Wait for threads to finish 505 boolean success=ThreadWaiter.waitForThreads(alpt, this); 506 errorState&=!success; 507 508 //Do anything necessary after processing 509 510 } 511 512 /** Spawn process threads */ 513 private void spawnThreads(final ConcurrentReadInputStream cris){ 514 515 //Do anything necessary prior to processing 516 517 //Determine how many threads may be used 518 final int threads=Shared.threads(); 519 520 //Fill a list with ProcessThreads 521 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 522 for(int i=0; i<threads; i++){ 523 alpt.add(new ProcessThread(cris, i)); 524 } 525 526 //Start the threads 527 for(ProcessThread pt : alpt){ 528 pt.start(); 529 } 530 if(verbose){outstream.println("Started worker threads.");} 531 532 //Wait for threads to finish 533 boolean success=ThreadWaiter.waitForThreads(alpt, this); 534 errorState&=!success; 535 536 //Do anything necessary after processing 537 ReadWrite.closeStreams(cris); 538 } 539 540 private void outputConsensus(ConcurrentReadOutputStream ros){ 541 if(verbose){outstream.println("Making consensus.");} 542 ArrayList<Read> consensusList=new ArrayList<Read>(200); 543 ArrayList<BaseGraph> graphList=new ArrayList<BaseGraph>(200); 544 long num=0; 545 for(Entry<String, BaseGraph> e : refMap.entrySet()){ 546 BaseGraph bg=e.getValue(); 547 graphList.add(bg); 548 549 Read r=bg.traverse(); 550 if(name!=null){r.id=name;} 551 consensusList.add(r); 552 553 refCount+=bg.refCount; 554 subCount+=bg.subCount; 555 delCount+=bg.delCount; 556 insCount+=bg.insCount; 557 558 readsOut++; 559 basesOut+=r.length(); 560 561 if(consensusList.size()>=200){ 562 if(ros!=null){ros.add(consensusList, num);} 563 consensusList=new ArrayList<Read>(200); 564 num++; 565 } 566 } 567 if(consensusList.size()>0){ 568 if(ros!=null){ros.add(consensusList, num);} 569 consensusList=new ArrayList<Read>(200); 570 num++; 571 } 572 if(graphList.size()>0 && ffmodel!=null){ 573 // ReadWrite.writeObjectInThread(graphList, ffmodel.name(), true); 574 ReadWrite.writeAsync(graphList, ffmodel.name(), true); 575 } 576 } 577 578 @Override 579 public final void accumulate(ProcessThread pt){ 580 readsProcessed+=pt.readsProcessedT; 581 basesProcessed+=pt.basesProcessedT; 582 alignedReads+=pt.alignedReadsT; 583 identitySum+=pt.identitySumT; 584 scoreSum+=pt.scoreSumT; 585 for(int i=0; i<idHist.length; i++){ 586 idHist[i]+=pt.idHistT[i]; 587 scoreHist[i]+=pt.scoreHistT[i]; 588 } 589 errorState|=(!pt.success); 590 } 591 592 @Override 593 public final boolean success(){return !errorState;} 594 595 /*--------------------------------------------------------------*/ 596 /*---------------- Inner Methods ----------------*/ 597 /*--------------------------------------------------------------*/ 598 599 @Override 600 public ByteBuilder toText() { 601 // TODO Auto-generated method stub 602 return null; 603 } 604 605 /*--------------------------------------------------------------*/ 606 /*---------------- Inner Classes ----------------*/ 607 /*--------------------------------------------------------------*/ 608 609 /** This class is static to prevent accidental writing to shared variables. 610 * It is safe to remove the static modifier. */ 611 class ProcessThread extends Thread { 612 613 //Constructor 614 ProcessThread(final SamStreamer ss_, final int tid_){ 615 ss=ss_; 616 cris=null; 617 tid=tid_; 618 realigner=(realign ? new Realigner() : null); 619 } 620 621 //Constructor 622 ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){ 623 ss=null; 624 cris=cris_; 625 tid=tid_; 626 realigner=null;//(realign ? new Realigner() : null); 627 } 628 629 //Called by start() 630 @Override 631 public void run(){ 632 //Do anything necessary prior to processing 633 634 //Process the reads 635 processInner(); 636 637 //Do anything necessary after processing 638 639 //Indicate successful exit status 640 success=true; 641 } 642 643 /** Iterate through the reads */ 644 void processInner(){ 645 646 647 //Grab and process all lists 648 649 if(ss!=null){ 650 for(ListNum<Read> ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){ 651 processList(ln); 652 } 653 }else{ 654 for(ListNum<Read> ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()){ 655 processList(ln); 656 cris.returnList(ln); 657 } 658 } 659 660 } 661 662 void processList(ListNum<Read> ln){ 663 664 //Loop through each read in the list 665 for(Read r : ln){ 666 667 //Validate reads in worker threads 668 if(!r.validated()){r.validate(true);} 669 670 //Track the initial length for statistics 671 final int initialLength=r.length(); 672 673 //Increment counters 674 readsProcessedT++; 675 basesProcessedT+=initialLength; 676 677 { 678 //Reads are processed in this block. 679 processRead(r); 680 } 681 } 682 } 683 684 /** 685 * Process a read or a read pair. 686 * @param r Read 1 687 * @param r2 Read 2 (may be null) 688 * @return True if the reads should be kept, false if they should be discarded. 689 */ 690 void processRead(final Read r){ 691 if(r.bases==null || r.length()<=1){return;} 692 693 final SamLine sl=r.samline; 694 if(sl!=null && !sl.mapped()){return;} 695 final String rname=(sl==null ? defaultRname : sl.rnameS()); 696 BaseGraph bg=refMap.get(rname); 697 if(bg==null){bg=refMap2.get(Tools.trimToWhitespace(rname));} 698 assert(bg!=null) : "Can't find graph for "+rname; 699 float identity; 700 float score=0;//unused 701 702 // assert(false) : r; 703 if(sl==null){ 704 // identity=SketchObject.alignAndMakeMatch(r, bg.original); 705 // assert(false) : bg.refWeights[0]; 706 // identity=SketchObject.alignAndMakeMatch(r, bg.original, bg.refWeights, bg.insWeights, bg.delWeights); 707 identity=SketchObject.alignAndMakeMatch(r, bg.original, bg.refWeights); 708 if(identity<=0){return;} 709 assert(r.match!=null) : identity+", "+r; 710 }else{ 711 assert(sl!=null && sl.mapped() && sl.seq!=null) : sl; 712 assert(r.match!=null); 713 if(samFilter!=null && !samFilter.passesFilter(sl)){return;} 714 identity=sl.calcIdentity(); 715 } 716 assert(r.match!=null && identity>0) : identity+", "+r; 717 718 if(inModel!=null){ 719 720 if(verbose){System.err.println(r.start+"\t"+r.stop+"\t"+new String(r.match));} 721 score=inModel.score(r, false, true); 722 if(printScores){ 723 // System.err.println(identity+"\t"+score+"\t"+inModel.score(r, false, true)); 724 System.err.println(String.format("%.5f\t%.5f", identity, score)); 725 } 726 // assert(false); 727 // if(score<0){ 728 // verbose=true; 729 // inModel.score(r, false, false); 730 // } 731 // assert(!verbose); 732 } 733 734 alignedReadsT++; 735 identitySumT+=identity; 736 scoreSumT+=score; 737 idHistT[Tools.mid(0, Math.round(100*identity), 100)]++; 738 scoreHistT[Tools.mid(0, Math.round(100*score), 100)]++; 739 740 741 742 // assert(sl.calcIdentity()<=0.78) : sl.calcIdentity()+", "+samFilter.maxId; 743 // assert(false) : sl.cigar+", "+sl.calcIdentity(); 744 745 // System.err.println(rname); 746 747 if(realigner!=null) { 748 assert(false); 749 realigner.realign(r, sl, bg.original, true); 750 } 751 752 bg.add(r); 753 } 754 755 long[] idHistT=new long[101]; 756 long[] scoreHistT=new long[101]; 757 758 /** Number of reads processed by this thread */ 759 protected long readsProcessedT=0; 760 /** Number of bases processed by this thread */ 761 protected long basesProcessedT=0; 762 763 protected long alignedReadsT=0; 764 765 double identitySumT=0; 766 double scoreSumT=0; 767 768 /** True only if this thread has completed successfully */ 769 boolean success=false; 770 771 /** Shared input stream */ 772 private final SamStreamer ss; 773 /** Alternate input stream */ 774 private final ConcurrentReadInputStream cris; 775 /** Thread ID */ 776 final int tid; 777 /** For realigning reads */ 778 final Realigner realigner; 779 } 780 781 /*--------------------------------------------------------------*/ 782 /*---------------- Fields ----------------*/ 783 /*--------------------------------------------------------------*/ 784 785 /** Read input file path */ 786 private String in=null; 787 /** Reference input file path */ 788 private String ref=null; 789 790 private String inModelFile; 791 private BaseGraph inModel; 792 private String outHistFile; 793 794 /** Consensus output file path */ 795 private String out=null; 796 /** Model output file path */ 797 private String outModel=null; 798 799 /** Override input file extension */ 800 private String extin=null; 801 /** Override output file extension */ 802 private String extout=null; 803 804 /*--------------------------------------------------------------*/ 805 806 /** Number of reads processed */ 807 protected long readsProcessed=0; 808 /** Number of bases processed */ 809 protected long basesProcessed=0; 810 811 protected long alignedReads=0; 812 813 protected double identitySum=0; 814 protected double scoreSum=0; 815 816 /** Number of reads retained */ 817 protected long readsOut=0; 818 /** Number of bases retained */ 819 protected long basesOut=0; 820 821 /** Quit after processing this many input reads; -1 means no limit */ 822 private long maxReads=-1; 823 824 public long subCount=0; 825 public long refCount=0; 826 public long delCount=0; 827 public long insCount=0; 828 829 long[] idHist=new long[101]; 830 long[] scoreHist=new long[101]; 831 boolean printScores=false; 832 833 /*--------------------------------------------------------------*/ 834 835 /** Threads dedicated to reading the sam file */ 836 private int streamerThreads=SamStreamer.DEFAULT_THREADS; 837 838 private String name=null; 839 840 private boolean loadedRef=false; 841 private boolean specialRef=false; 842 843 private boolean realign=false; 844 845 private int ploidy=1; 846 847 private final boolean silent; 848 849 public final SamFilter samFilter=new SamFilter(); 850 /** Uses full ref names */ 851 public LinkedHashMap<String, BaseGraph> refMap; 852 /** Uses truncated ref names */ 853 public LinkedHashMap<String, BaseGraph> refMap2; 854 855 private String defaultRname=null; 856 857 /*--------------------------------------------------------------*/ 858 /*---------------- Final Fields ----------------*/ 859 /*--------------------------------------------------------------*/ 860 861 /** Read input file */ 862 private final FileFormat ffin; 863 /** Reference input file */ 864 private final FileFormat ffref; 865 866 /** Consensus output file */ 867 private final FileFormat ffout; 868 /** Model output file */ 869 private final FileFormat ffmodel; 870 871 /*--------------------------------------------------------------*/ 872 /*---------------- Common Fields ----------------*/ 873 /*--------------------------------------------------------------*/ 874 875 /** Print status messages to this output stream */ 876 private PrintStream outstream=System.err; 877 /** True if an error was encountered */ 878 public boolean errorState=false; 879 /** Overwrite existing output files */ 880 private boolean overwrite=false; 881 /** Append to existing output files */ 882 private boolean append=false; 883 /** Reads ARE output in input order, even though this is false */ 884 private final boolean ordered=false; 885 886 } 887