1 package sketch; 2 3 import java.io.File; 4 import java.io.PrintStream; 5 import java.util.ArrayList; 6 import java.util.Arrays; 7 8 import dna.AminoAcid; 9 import fileIO.ByteFile; 10 import fileIO.FileFormat; 11 import fileIO.ReadWrite; 12 import shared.Parse; 13 import shared.Parser; 14 import shared.PreParser; 15 import shared.ReadStats; 16 import shared.Shared; 17 import shared.Timer; 18 import shared.Tools; 19 import stream.ConcurrentReadInputStream; 20 import stream.ConcurrentReadOutputStream; 21 import stream.FASTQ; 22 import stream.FastaReadInputStream; 23 import stream.Read; 24 import structures.ListNum; 25 26 /** 27 * 28 * @author Brian Bushnell 29 * @date July 25, 2018 30 * 31 */ 32 public class KmerLimit extends SketchObject { 33 34 /*--------------------------------------------------------------*/ 35 /*---------------- Initialization ----------------*/ 36 /*--------------------------------------------------------------*/ 37 38 /** 39 * Code entrance from the command line. 40 * @param args Command line arguments 41 */ main(String[] args)42 public static void main(String[] args){ 43 //Start a timer immediately upon code entrance. 44 Timer t=new Timer(); 45 46 //Create an instance of this class 47 KmerLimit x=new KmerLimit(args); 48 49 //Run the object 50 x.process(t); 51 52 //Close the print stream if it was redirected 53 Shared.closeStream(x.outstream); 54 } 55 56 /** 57 * Constructor. 58 * @param args Command line arguments 59 */ KmerLimit(String[] args)60 public KmerLimit(String[] args){ 61 62 {//Preparse block for help, config files, and outstream 63 PreParser pp=new PreParser(args, getClass(), false); 64 args=pp.args; 65 outstream=pp.outstream; 66 } 67 68 boolean setInterleaved=false; //Whether interleaved was explicitly set. 69 70 //Set shared static variables 71 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 72 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 73 SketchObject.setKeyFraction(0.1); 74 defaultParams.minEntropy=0; 75 defaultParams.minProb=0.2f; 76 77 boolean setHeapSize=false; 78 int heapSize_=4095; 79 long targetKmers_=0; 80 int k_=32; 81 int minCount_=1; 82 83 //Create a parser object 84 Parser parser=new Parser(); 85 parser.overwrite=true; 86 87 //Parse each argument 88 for(int i=0; i<args.length; i++){ 89 String arg=args[i]; 90 91 //Break arguments into their constituent parts, in the form of "a=b" 92 String[] split=arg.split("="); 93 String a=split[0].toLowerCase(); 94 String b=split.length>1 ? split[1] : null; 95 if(b!=null && b.equalsIgnoreCase("null")){b=null;} 96 97 if(a.equals("verbose")){ 98 verbose=Parse.parseBoolean(b); 99 }else if(a.equals("ordered")){ 100 ordered=Parse.parseBoolean(b); 101 }else if(a.equals("shuffle")){ 102 shuffle=Parse.parseBoolean(b); 103 }else if(a.equals("size") || a.equals("heapsize")){ 104 heapSize_=Parse.parseIntKMG(b); 105 setHeapSize=true; 106 }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){ 107 targetKmers_=Parse.parseKMG(b); 108 }else if(a.equals("mincount")){ 109 minCount_=Parse.parseIntKMG(b); 110 }else if(parseSketchFlags(arg, a, b)){ 111 parser.parse(arg, a, b); 112 }else if(defaultParams.parse(arg, a, b)){ 113 parser.parse(arg, a, b); 114 }else if(a.equals("parse_flag_goes_here")){ 115 long fake_variable=Parse.parseKMG(b); 116 //Set a variable here 117 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser 118 //do nothing 119 }else{ 120 outstream.println("Unknown parameter "+args[i]); 121 assert(false) : "Unknown parameter "+args[i]; 122 } 123 } 124 125 if(!setHeapSize && minCount_>1){heapSize_=32000;} 126 heapSize=heapSize_; 127 targetKmers=targetKmers_; 128 k=k_; 129 minCount=minCount_; 130 assert(targetKmers>0) : "Must set a kmer limit."; 131 assert(heapSize>0) : "Heap size must be positive."; 132 assert(k>0 && k<=32) : "0<k<33; k="+k; 133 postParse(); 134 135 if(minCount>1){ 136 Shared.setBufferLen(800); 137 } 138 139 {//Process parser fields 140 Parser.processQuality(); 141 142 maxReads=parser.maxReads; 143 144 overwrite=ReadStats.overwrite=parser.overwrite; 145 append=ReadStats.append=parser.append; 146 setInterleaved=parser.setInterleaved; 147 148 in1=parser.in1; 149 in2=parser.in2; 150 qfin1=parser.qfin1; 151 qfin2=parser.qfin2; 152 153 out1=parser.out1; 154 out2=parser.out2; 155 qfout1=parser.qfout1; 156 qfout2=parser.qfout2; 157 158 extin=parser.extin; 159 extout=parser.extout; 160 } 161 162 //Do input file # replacement 163 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ 164 in2=in1.replace("#", "2"); 165 in1=in1.replace("#", "1"); 166 } 167 168 //Do output file # replacement 169 if(out1!=null && out2==null && out1.indexOf('#')>-1){ 170 out2=out1.replace("#", "2"); 171 out1=out1.replace("#", "1"); 172 } 173 174 //Adjust interleaved detection based on the number of input files 175 if(in2!=null){ 176 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} 177 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 178 } 179 180 assert(FastaReadInputStream.settingsOK()); 181 182 //Ensure there is an input file 183 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} 184 185 //Adjust the number of threads for input file reading 186 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 187 ByteFile.FORCE_MODE_BF2=true; 188 } 189 190 //Ensure out2 is not set without out1 191 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");} 192 193 //Adjust interleaved settings based on number of output files 194 if(!setInterleaved){ 195 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n"; 196 if(in2!=null){ //If there are 2 input streams. 197 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 198 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 199 }else{ //There is one input stream. 200 if(out2!=null){ 201 FASTQ.FORCE_INTERLEAVED=true; 202 FASTQ.TEST_INTERLEAVED=false; 203 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 204 } 205 } 206 } 207 208 //Ensure output files can be written 209 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){ 210 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2); 211 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n"); 212 } 213 214 //Ensure input files can be read 215 if(!Tools.testInputFiles(false, true, in1, in2)){ 216 throw new RuntimeException("\nCan't read some input files.\n"); 217 } 218 219 //Ensure that no file was specified multiple times 220 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){ 221 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 222 } 223 224 //Create output FileFormat objects 225 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered); 226 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered); 227 228 //Create input FileFormat objects 229 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); 230 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); 231 232 minProb=defaultParams.minProb; 233 minQual=defaultParams.minQual; 234 235 shift=2*k; 236 shift2=shift-2; 237 mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32 238 sharedHeap=new SketchHeap(heapSize, 0, minCount>1); 239 } 240 241 /*--------------------------------------------------------------*/ 242 /*---------------- Outer Methods ----------------*/ 243 /*--------------------------------------------------------------*/ 244 245 /** Create read streams and process all data */ process(Timer t)246 void process(Timer t){ 247 248 //Turn off read validation in the input threads to increase speed 249 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; 250 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; 251 252 //Create a read input stream 253 final ConcurrentReadInputStream cris; 254 { 255 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2); 256 cris.start(); //Start the stream 257 if(verbose){outstream.println("Started cris");} 258 } 259 boolean paired=cris.paired(); 260 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} 261 262 //Optionally create a read output stream 263 final ConcurrentReadOutputStream ros; 264 if(ffout1!=null){ 265 //Select output buffer size based on whether it needs to be ordered 266 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 267 268 //Notify user of output mode 269 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){ 270 outstream.println("Writing interleaved."); 271 } 272 273 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false); 274 ros.start(); //Start the stream 275 }else{ros=null;} 276 277 //Reset counters 278 readsProcessed=readsOut=0; 279 basesProcessed=basesOut=0; 280 281 //Process the reads in separate threads 282 spawnThreads(cris, ros); 283 284 if(verbose){outstream.println("Finished; closing streams.");} 285 286 //Write anything that was accumulated by ReadStats 287 errorState|=ReadStats.writeAll(); 288 //Close the read streams 289 errorState|=ReadWrite.closeStreams(cris, ros); 290 291 //Reset read validation 292 Read.VALIDATE_IN_CONSTRUCTOR=vic; 293 294 //Report timing and results 295 t.stop(); 296 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 297 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); 298 String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8); 299 outstream.println("Unique Kmers Out: "+kstring); 300 301 // Sketch sk=new Sketch(sharedHeap, true, true, null); 302 // outstream.println(sk.genomeSizeEstimate()); 303 304 //Throw an exception of there was an error in a thread 305 if(errorState){ 306 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 307 } 308 } 309 310 /** Spawn process threads */ 311 private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){ 312 313 //Do anything necessary prior to processing 314 315 //Determine how many threads may be used 316 final int threads=Tools.min(8, Shared.threads()); 317 318 //Fill a list with ProcessThreads 319 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 320 int tSize=heapSize/2; 321 for(int i=0; i<threads; i++){ 322 alpt.add(new ProcessThread(cris, ros, i, tSize)); 323 } 324 325 //Start the threads 326 for(ProcessThread pt : alpt){ 327 pt.start(); 328 } 329 330 //Wait for completion of all threads 331 boolean success=true; 332 for(ProcessThread pt : alpt){ 333 334 //Wait until this thread has terminated 335 while(pt.getState()!=Thread.State.TERMINATED){ 336 try { 337 //Attempt a join operation 338 pt.join(); 339 } catch (InterruptedException e) { 340 //Potentially handle this, if it is expected to occur 341 e.printStackTrace(); 342 } 343 } 344 345 //Accumulate per-thread statistics 346 readsProcessed+=pt.readsProcessedT; 347 basesProcessed+=pt.basesProcessedT; 348 readsOut+=pt.readsOutT; 349 basesOut+=pt.basesOutT; 350 success&=pt.success; 351 } 352 353 //Track whether any threads failed 354 if(!success){errorState=true;} 355 356 //Do anything necessary after processing 357 358 } 359 360 /*--------------------------------------------------------------*/ 361 /*---------------- Inner Methods ----------------*/ 362 /*--------------------------------------------------------------*/ 363 364 /*--------------------------------------------------------------*/ 365 /*---------------- Inner Classes ----------------*/ 366 /*--------------------------------------------------------------*/ 367 368 /** This class is static to prevent accidental writing to shared variables. 369 * It is safe to remove the static modifier. */ 370 private class ProcessThread extends Thread { 371 372 //Constructor 373 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){ 374 cris=cris_; 375 ros=ros_; 376 tid=tid_; 377 localHeap=new SketchHeap(size, 0, minCount>1); 378 } 379 380 //Called by start() 381 @Override 382 public void run(){ 383 //Do anything necessary prior to processing 384 385 //Process the reads 386 processInner(); 387 388 //Do anything necessary after processing 389 390 //Indicate successful exit status 391 success=true; 392 } 393 394 /** Iterate through the reads */ 395 void processInner(){ 396 397 //Grab the first ListNum of reads 398 ListNum<Read> ln=cris.nextList(); 399 //Grab the actual read list from the ListNum 400 ArrayList<Read> reads=(ln!=null ? ln.list : null); 401 402 //Check to ensure pairing is as expected 403 if(reads!=null && !reads.isEmpty()){ 404 Read r=reads.get(0); 405 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access 406 } 407 408 //As long as there is a nonempty read list... 409 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 410 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access 411 412 //Loop through each read in the list 413 for(int idx=0; idx<reads.size(); idx++){ 414 final Read r1=reads.get(idx); 415 final Read r2=r1.mate; 416 417 //Validate reads in worker threads 418 if(!r1.validated()){r1.validate(true);} 419 if(r2!=null && !r2.validated()){r2.validate(true);} 420 421 //Track the initial length for statistics 422 final int initialLength1=r1.length(); 423 final int initialLength2=r1.mateLength(); 424 425 //Increment counters 426 readsProcessedT+=r1.pairCount(); 427 basesProcessedT+=initialLength1+initialLength2; 428 429 //Reads are processed in this block. 430 processReadPair(r1, r2); 431 } 432 433 long count=dumpHeap(); 434 if(count>=targetKmers){break;} 435 436 for(Read r1 : reads){ 437 readsOutT+=r1.pairCount(); 438 basesOutT+=r1.pairLength(); 439 } 440 441 //Output reads to the output stream 442 if(ros!=null){ros.add(reads, ln.id);} 443 444 //Notify the input stream that the list was used 445 cris.returnList(ln); 446 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access 447 448 //Fetch a new list 449 ln=cris.nextList(); 450 reads=(ln!=null ? ln.list : null); 451 } 452 453 //Notify the input stream that the final list was used 454 if(ln!=null){ 455 if(ln.list!=null){ln.list.clear();} 456 cris.returnList(ln.id, true); 457 } 458 } 459 460 /** 461 * Process a read or a read pair. 462 * @param r1 Read 1 463 * @param r2 Read 2 (may be null) 464 */ 465 void processReadPair(final Read r1, final Read r2){ 466 processReadNucleotide(r1); 467 if(r2!=null){processReadNucleotide(r2);} 468 } 469 470 void processReadNucleotide(final Read r){ 471 final byte[] bases=r.bases; 472 final byte[] quals=r.quality; 473 long kmer=0; 474 long rkmer=0; 475 int len=0; 476 assert(!r.aminoacid()); 477 478 final long min=minHashValue; 479 localHeap.genomeSizeBases+=r.length(); 480 localHeap.genomeSequences++; 481 482 if(quals==null || (minProb<=0 && minQual<2)){ 483 for(int i=0; i<bases.length; i++){ 484 byte b=bases[i]; 485 long x=AminoAcid.baseToNumber[b]; 486 long x2=AminoAcid.baseToComplementNumber[b]; 487 488 kmer=((kmer<<2)|x)&mask; 489 rkmer=((rkmer>>>2)|(x2<<shift2))&mask; 490 491 if(x<0){len=0; rkmer=0;}else{len++;} 492 if(len>=k){ 493 localHeap.genomeSizeKmers++; 494 final long hashcode=hash(kmer, rkmer); 495 if(hashcode>min){localHeap.add(hashcode);} 496 } 497 } 498 }else{ 499 float prob=1; 500 for(int i=0; i<bases.length; i++){ 501 final byte b=bases[i]; 502 final long x=AminoAcid.baseToNumber[b]; 503 final long x2=AminoAcid.baseToComplementNumber[b]; 504 505 kmer=((kmer<<2)|x)&mask; 506 rkmer=((rkmer>>>2)|(x2<<shift2))&mask; 507 508 {//Quality-related stuff 509 final byte q=quals[i]; 510 assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual; 511 prob=prob*align2.QualityTools.PROB_CORRECT[q]; 512 if(len>k){ 513 byte oldq=quals[i-k]; 514 prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq]; 515 } 516 if(x<0 || q<minQual){ 517 len=0; 518 kmer=rkmer=0; 519 prob=1; 520 }else{ 521 len++; 522 } 523 } 524 525 if(len>=k && prob>=minProb){ 526 localHeap.genomeSizeKmers++; 527 localHeap.probSum+=prob; 528 final long hashcode=hash(kmer, rkmer); 529 if(hashcode>min){localHeap.checkAndAdd(hashcode);} 530 } 531 } 532 } 533 } 534 535 private long dumpHeap(){ 536 long count=0; 537 synchronized(sharedHeap){ 538 count=sharedHeap.genomeSizeEstimate(minCount); 539 if(count<targetKmers){ 540 sharedHeap.add(localHeap); 541 localHeap.clear(); 542 } 543 } 544 return count; 545 } 546 547 /** Number of reads processed by this thread */ 548 protected long readsProcessedT=0; 549 /** Number of bases processed by this thread */ 550 protected long basesProcessedT=0; 551 552 /** Number of reads retained by this thread */ 553 protected long readsOutT=0; 554 /** Number of bases retained by this thread */ 555 protected long basesOutT=0; 556 557 /** True only if this thread has completed successfully */ 558 boolean success=false; 559 560 /** Shared input stream */ 561 private final ConcurrentReadInputStream cris; 562 /** Shared output stream */ 563 private final ConcurrentReadOutputStream ros; 564 /** Thread ID */ 565 final int tid; 566 567 final SketchHeap localHeap; 568 } 569 570 /*--------------------------------------------------------------*/ 571 /*---------------- Fields ----------------*/ 572 /*--------------------------------------------------------------*/ 573 574 /** Primary input file path */ 575 private String in1=null; 576 /** Secondary input file path */ 577 private String in2=null; 578 579 private String qfin1=null; 580 private String qfin2=null; 581 582 /** Primary output file path */ 583 private String out1=null; 584 /** Secondary output file path */ 585 private String out2=null; 586 587 private String qfout1=null; 588 private String qfout2=null; 589 590 /** Override input file extension */ 591 private String extin=null; 592 /** Override output file extension */ 593 private String extout=null; 594 595 /*--------------------------------------------------------------*/ 596 597 /** Number of reads processed */ 598 protected long readsProcessed=0; 599 /** Number of bases processed */ 600 protected long basesProcessed=0; 601 602 /** Number of reads retained */ 603 protected long readsOut=0; 604 /** Number of bases retained */ 605 protected long basesOut=0; 606 607 /** Quit after processing this many input reads; -1 means no limit */ 608 private long maxReads=-1; 609 610 /*--------------------------------------------------------------*/ 611 /*---------------- Final Fields ----------------*/ 612 /*--------------------------------------------------------------*/ 613 614 /** Primary input file */ 615 private final FileFormat ffin1; 616 /** Secondary input file */ 617 private final FileFormat ffin2; 618 619 /** Primary output file */ 620 private final FileFormat ffout1; 621 /** Secondary output file */ 622 private final FileFormat ffout2; 623 624 private final SketchHeap sharedHeap; 625 private final int heapSize; 626 private final long targetKmers; 627 private final int minCount; 628 629 final int shift; 630 final int shift2; 631 final long mask; 632 633 final float minProb; 634 final byte minQual; 635 636 /*--------------------------------------------------------------*/ 637 /*---------------- Common Fields ----------------*/ 638 /*--------------------------------------------------------------*/ 639 640 /** Print status messages to this output stream */ 641 private PrintStream outstream=System.err; 642 /** Print verbose messages */ 643 public static boolean verbose=false; 644 /** True if an error was encountered */ 645 public boolean errorState=false; 646 /** Overwrite existing output files */ 647 private boolean overwrite=true; 648 /** Append to existing output files */ 649 private boolean append=false; 650 /** Reads are output in input order (not enabled) */ 651 private boolean ordered=false; 652 /** Shuffle input */ 653 private boolean shuffle=false; 654 655 } 656