1 package sketch; 2 3 import java.io.File; 4 import java.io.PrintStream; 5 import java.util.ArrayList; 6 import java.util.Arrays; 7 import java.util.Locale; 8 import java.util.Random; 9 10 import dna.AminoAcid; 11 import fileIO.ByteFile; 12 import fileIO.FileFormat; 13 import fileIO.ReadWrite; 14 import shared.Parse; 15 import shared.Parser; 16 import shared.PreParser; 17 import shared.ReadStats; 18 import shared.Shared; 19 import shared.Timer; 20 import shared.Tools; 21 import stream.ConcurrentReadInputStream; 22 import stream.ConcurrentReadOutputStream; 23 import stream.FASTQ; 24 import stream.FastaReadInputStream; 25 import stream.Read; 26 import structures.IntMap; 27 import structures.ListNum; 28 29 /** 30 * 31 * @author Brian Bushnell 32 * @date July 30, 2018 33 * 34 */ 35 public class KmerLimit2 extends SketchObject { 36 37 /*--------------------------------------------------------------*/ 38 /*---------------- Initialization ----------------*/ 39 /*--------------------------------------------------------------*/ 40 41 /** 42 * Code entrance from the command line. 43 * @param args Command line arguments 44 */ main(String[] args)45 public static void main(String[] args){ 46 //Start a timer immediately upon code entrance. 47 Timer t=new Timer(); 48 49 //Create an instance of this class 50 KmerLimit2 x=new KmerLimit2(args); 51 52 //Run the object 53 x.process(t); 54 55 //Close the print stream if it was redirected 56 Shared.closeStream(x.outstream); 57 } 58 59 /** 60 * Constructor. 61 * @param args Command line arguments 62 */ KmerLimit2(String[] args)63 public KmerLimit2(String[] args){ 64 65 {//Preparse block for help, config files, and outstream 66 PreParser pp=new PreParser(args, getClass(), false); 67 args=pp.args; 68 outstream=pp.outstream; 69 } 70 71 boolean setInterleaved=false; //Whether interleaved was explicitly set. 72 73 //Set shared static variables 74 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 75 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 76 SketchObject.setKeyFraction(0.1); 77 defaultParams.minEntropy=0; 78 defaultParams.minProb=0.2f; 79 80 boolean setHeapSize=false; 81 int heapSize_=8091; 82 long targetKmers_=0; 83 int k_=32; 84 int minCount_=1; 85 86 //Create a parser object 87 Parser parser=new Parser(); 88 parser.overwrite=true; 89 90 //Parse each argument 91 for(int i=0; i<args.length; i++){ 92 String arg=args[i]; 93 94 //Break arguments into their constituent parts, in the form of "a=b" 95 String[] split=arg.split("="); 96 String a=split[0].toLowerCase(); 97 String b=split.length>1 ? split[1] : null; 98 if(b!=null && b.equalsIgnoreCase("null")){b=null;} 99 100 if(a.equals("verbose")){ 101 verbose=Parse.parseBoolean(b); 102 }else if(a.equals("ordered")){ 103 ordered=Parse.parseBoolean(b); 104 }else if(a.equals("size") || a.equals("heapsize")){ 105 heapSize_=Parse.parseIntKMG(b); 106 setHeapSize=true; 107 }else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){ 108 targetKmers_=Parse.parseKMG(b); 109 }else if(a.equals("mincount")){ 110 minCount_=Parse.parseIntKMG(b); 111 }else if(a.equals("maxexpandedlength") || a.equals("maxlength") || a.equals("maxlen")){ 112 maxExpandedLength=Parse.parseIntKMG(b); 113 }else if(a.equals("seed")){ 114 seed=Parse.parseKMG(b); 115 }else if(a.equals("trials")){ 116 trials=Parse.parseIntKMG(b); 117 }else if(parseSketchFlags(arg, a, b)){ 118 parser.parse(arg, a, b); 119 }else if(defaultParams.parse(arg, a, b)){ 120 parser.parse(arg, a, b); 121 }else if(a.equals("parse_flag_goes_here")){ 122 long fake_variable=Parse.parseKMG(b); 123 //Set a variable here 124 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser 125 //do nothing 126 }else{ 127 outstream.println("Unknown parameter "+args[i]); 128 assert(false) : "Unknown parameter "+args[i]; 129 } 130 } 131 132 if(!setHeapSize && minCount_>1){heapSize_=32000;} 133 heapSize=heapSize_; 134 targetKmers=targetKmers_; 135 k=k_; 136 minCount=minCount_; 137 assert(targetKmers>0) : "Must set a kmer limit."; 138 assert(heapSize>0) : "Heap size must be positive."; 139 assert(k>0 && k<=32) : "0<k<33; k="+k; 140 postParse(); 141 142 // if(minCount>1){ 143 // Shared.setBufferLen(800); 144 // } 145 146 {//Process parser fields 147 Parser.processQuality(); 148 149 maxReads=parser.maxReads; 150 151 overwrite=ReadStats.overwrite=parser.overwrite; 152 append=ReadStats.append=parser.append; 153 setInterleaved=parser.setInterleaved; 154 155 in1=parser.in1; 156 in2=parser.in2; 157 qfin1=parser.qfin1; 158 qfin2=parser.qfin2; 159 160 out1=parser.out1; 161 out2=parser.out2; 162 qfout1=parser.qfout1; 163 qfout2=parser.qfout2; 164 165 extin=parser.extin; 166 extout=parser.extout; 167 } 168 169 //Do input file # replacement 170 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ 171 in2=in1.replace("#", "2"); 172 in1=in1.replace("#", "1"); 173 } 174 175 //Do output file # replacement 176 if(out1!=null && out2==null && out1.indexOf('#')>-1){ 177 out2=out1.replace("#", "2"); 178 out1=out1.replace("#", "1"); 179 } 180 181 //Adjust interleaved detection based on the number of input files 182 if(in2!=null){ 183 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} 184 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 185 } 186 187 assert(FastaReadInputStream.settingsOK()); 188 189 //Ensure there is an input file 190 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} 191 192 //Adjust the number of threads for input file reading 193 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 194 ByteFile.FORCE_MODE_BF2=true; 195 } 196 197 //Ensure out2 is not set without out1 198 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");} 199 200 //Adjust interleaved settings based on number of output files 201 if(!setInterleaved){ 202 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n"; 203 if(in2!=null){ //If there are 2 input streams. 204 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 205 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 206 }else{ //There is one input stream. 207 if(out2!=null){ 208 FASTQ.FORCE_INTERLEAVED=true; 209 FASTQ.TEST_INTERLEAVED=false; 210 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 211 } 212 } 213 } 214 215 //Ensure output files can be written 216 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){ 217 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2); 218 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n"); 219 } 220 221 //Ensure input files can be read 222 if(!Tools.testInputFiles(false, true, in1, in2)){ 223 throw new RuntimeException("\nCan't read some input files.\n"); 224 } 225 226 //Ensure that no file was specified multiple times 227 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){ 228 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 229 } 230 231 //Create output FileFormat objects 232 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered); 233 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered); 234 235 //Create input FileFormat objects 236 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); 237 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); 238 239 minProb=defaultParams.minProb; 240 minQual=defaultParams.minQual; 241 242 shift=2*k; 243 shift2=shift-2; 244 mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32 245 sharedHeap=new SketchHeap(heapSize, 0, true); 246 } 247 248 /*--------------------------------------------------------------*/ 249 /*---------------- Outer Methods ----------------*/ 250 /*--------------------------------------------------------------*/ 251 252 /** Create read streams and process all data */ process(Timer t)253 void process(Timer t){ 254 255 //Turn off read validation in the input threads to increase speed 256 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; 257 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; 258 259 // //Optionally create a read output stream 260 // final ConcurrentReadOutputStream ros; 261 // if(ffout1!=null){ 262 // //Select output buffer size based on whether it needs to be ordered 263 // final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 264 // 265 // //Notify user of output mode 266 // if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){ 267 // outstream.println("Writing interleaved."); 268 // } 269 // 270 // ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false); 271 // ros.start(); //Start the stream 272 // }else{ros=null;} 273 274 //Reset counters 275 readsProcessed=readsOut=0; 276 basesProcessed=basesOut=0; 277 278 //Process the reads in separate threads 279 spawnThreads0(); 280 281 // if(verbose){outstream.println("Finished; closing streams.");} 282 283 //Reset read validation 284 Read.VALIDATE_IN_CONSTRUCTOR=vic; 285 286 287 Sketch sketch=new Sketch(sharedHeap, true, true, null); 288 sketch=capLengthAtCountSum(sketch, maxExpandedLength); 289 final long reads=Tools.max(1, sketch.genomeSequences); 290 final long targetReads=calcTargetReads(sketch, targetKmers, minCount, trials, seed); 291 final double targetRate=Tools.min(1, targetReads/(double)reads); 292 final String targetRateS=String.format(Locale.ROOT, "%.4f%%",targetRate*100); 293 294 //Report timing and results 295 t.stop(); 296 outstream.println("Finished counting kmers."); 297 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 298 299 String kstring0=Tools.padKM(sketch.genomeSizeEstimate(minCount), 8); 300 String rstring0=Tools.padKM(targetReads, 8); 301 outstream.println("Unique Kmers: "+kstring0); 302 outstream.println("Target Reads: "+rstring0+"\t"+targetRateS); 303 304 // outstream.println("Reads: \t"+reads); 305 // outstream.println("Unique Kmers: \t"+sketch.genomeSizeEstimate(minCount)); 306 // outstream.println("Target Reads: \t"+targetReads); 307 // outstream.println("Sample Rate: \t"+targetRateS); 308 // outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); 309 310 t.start(); 311 outstream.println("\nSubsampling reads."); 312 313 // String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8); 314 // outstream.println("Unique Kmers Out: "+kstring); 315 316 317 // ArrayList<String> args=new ArrayList<String>(); 318 // args.add("in="+in1); 319 // if(in2!=null){args.add("in2="+in2);} 320 // args.add("out="+out1); 321 // if(out2!=null){args.add("out2="+out2);} 322 // args.add("ordered="+ordered); 323 // args.add("ow="+(overwrite ? "t" : "f")); 324 // if(targetRate<1){args.add("samplerate="+targetRateS);} 325 // args.add("loglogout"); 326 // args.add("loglogk="+k); 327 // args.add("loglogminprob="+minProb); 328 // BBDukF.main(args.toArray(new String[0])); 329 330 // Sketch sk=new Sketch(sharedHeap, true, true, null); 331 // outstream.println(sk.genomeSizeEstimate()); 332 spawnThreads2(targetRate); 333 t.stop(); 334 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 335 336 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); 337 String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8); 338 outstream.println("Unique Kmers Out: "+kstring); 339 340 //Throw an exception of there was an error in a thread 341 if(errorState){ 342 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 343 } 344 } 345 346 /** Spawn process threads */ 347 private void spawnThreads0(){ 348 349 //Create a read input stream 350 final ConcurrentReadInputStream cris; 351 { 352 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2); 353 cris.start(); //Start the stream 354 if(verbose){outstream.println("Started cris");} 355 } 356 paired=cris.paired(); 357 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} 358 359 //Determine how many threads may be used 360 final int threads=Tools.min(10, Shared.threads()); 361 362 //Fill a list with ProcessThreads 363 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 364 for(int i=0; i<threads; i++){ 365 alpt.add(new ProcessThread(cris, null, i, heapSize)); 366 } 367 368 //Start the threads 369 for(ProcessThread pt : alpt){ 370 pt.start(); 371 } 372 373 //Wait for completion of all threads 374 boolean success=true; 375 for(ProcessThread pt : alpt){ 376 377 //Wait until this thread has terminated 378 while(pt.getState()!=Thread.State.TERMINATED){ 379 try { 380 //Attempt a join operation 381 pt.join(); 382 } catch (InterruptedException e) { 383 //Potentially handle this, if it is expected to occur 384 e.printStackTrace(); 385 } 386 } 387 388 //Accumulate per-thread statistics 389 readsProcessed+=pt.readsProcessedT; 390 basesProcessed+=pt.basesProcessedT; 391 readsOut+=pt.readsOutT; 392 basesOut+=pt.basesOutT; 393 success&=pt.success; 394 } 395 396 //Track whether any threads failed 397 if(!success){errorState=true;} 398 399 //Do anything necessary after processing 400 401 //Close the read streams 402 errorState|=ReadWrite.closeStreams(cris); 403 404 } 405 406 /** Spawn process threads */ 407 private void spawnThreads2(double rate){ 408 409 //Create a read input stream 410 final ConcurrentReadInputStream cris; 411 { 412 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2); 413 cris.setSampleRate((float)rate, seed); 414 cris.start(); //Start the stream 415 if(verbose){outstream.println("Started cris");} 416 } 417 // paired=cris.paired(); 418 // if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} 419 420 //Optionally create a read output stream 421 final ConcurrentReadOutputStream ros; 422 if(ffout1!=null){ 423 //Select output buffer size based on whether it needs to be ordered 424 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 425 426 //Notify user of output mode 427 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){ 428 outstream.println("Writing interleaved."); 429 } 430 431 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false); 432 ros.start(); //Start the stream 433 }else{ros=null;} 434 435 //Determine how many threads may be used 436 final int threads=Tools.min(10, Shared.threads()); 437 438 sharedHeap.clear(); 439 // readsProcessed=0; 440 // basesProcessed=0; 441 readsOut=0; 442 basesOut=0; 443 444 //Fill a list with ProcessThreads 445 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 446 for(int i=0; i<threads; i++){ 447 alpt.add(new ProcessThread(cris, ros, i, heapSize)); 448 } 449 450 //Start the threads 451 for(ProcessThread pt : alpt){ 452 pt.start(); 453 } 454 455 //Wait for completion of all threads 456 boolean success=true; 457 for(ProcessThread pt : alpt){ 458 459 //Wait until this thread has terminated 460 while(pt.getState()!=Thread.State.TERMINATED){ 461 try { 462 //Attempt a join operation 463 pt.join(); 464 } catch (InterruptedException e) { 465 //Potentially handle this, if it is expected to occur 466 e.printStackTrace(); 467 } 468 } 469 470 //Accumulate per-thread statistics 471 // readsProcessed+=pt.readsProcessedT; 472 // basesProcessed+=pt.basesProcessedT; 473 readsOut+=pt.readsOutT; 474 basesOut+=pt.basesOutT; 475 success&=pt.success; 476 } 477 478 //Track whether any threads failed 479 if(!success){errorState=true;} 480 481 //Do anything necessary after processing 482 483 //Write anything that was accumulated by ReadStats 484 errorState|=ReadStats.writeAll(); 485 //Close the read streams 486 errorState|=ReadWrite.closeStreams(cris, ros); 487 488 } 489 490 /*--------------------------------------------------------------*/ 491 /*---------------- Inner Methods ----------------*/ 492 /*--------------------------------------------------------------*/ 493 494 public static Sketch capLengthAtCountSum(Sketch sketch0, int max) { 495 int len=0; 496 long sum=0; 497 for(; len<sketch0.keyCounts.length; len++){ 498 sum=sum+sketch0.keyCounts[len]; 499 if(sum>max){break;} 500 } 501 if(len>=sketch0.length()){return sketch0;} 502 503 long[] keys=Arrays.copyOf(sketch0.keys, len); 504 int[] counts=Arrays.copyOf(sketch0.keyCounts, len); 505 506 // long[] array_, int[] counts_, int taxID_, long imgID_, long gSizeBases_, long gSizeKmers_, long gSequences_, double probCorrect_, 507 // String taxName_, String name0_, String fname_, ArrayList<String> meta_ 508 509 Sketch sk=new Sketch(keys, counts, null, null, null, -1, -1, 510 sketch0.genomeSizeBases, sketch0.genomeSizeKmers, sketch0.genomeSequences, sketch0.probCorrect, 511 null, null, null, null); 512 513 return sk; 514 } 515 516 public static long calcTargetReads(Sketch sketch, long targetKmers, int minCount, int trials, long seed){ 517 final int[] counts0=sketch.keyCounts; 518 final int[] counts=Arrays.copyOf(counts0, counts0.length); 519 final long size=sketch.genomeSizeEstimate(minCount); 520 final long reads=sketch.genomeSequences; 521 final double targetKmerFraction=targetKmers/(double)size; 522 if(targetKmerFraction>=1){return reads;} 523 524 final int targetKeys=(int)(targetKmerFraction*counts.length); 525 final long countSum=Tools.sum(counts0); 526 assert(countSum<Shared.MAX_ARRAY_LEN) : countSum; 527 // System.err.println("countsum: "+countSum); 528 529 final IntMap map=new IntMap(0, counts0.length); 530 final int[] expanded=new int[(int)countSum]; 531 532 long roundSum=0; 533 final Random randy=Shared.threadLocalRandom(seed); 534 for(int i=0; i<trials; i++){ 535 Tools.fill(counts, counts0); 536 // long rounds=reduceRounds(counts0, counts, minCount, targetKeys, randy); 537 long rounds=reduceRoundsIM(counts0, expanded, minCount, targetKeys, randy, map); 538 roundSum+=rounds; 539 } 540 double avgRounds=roundSum/(double)trials; 541 // System.err.println("avgRounds: "+avgRounds); 542 double targetCountFraction=1-(avgRounds/countSum); 543 // System.err.println("targetFraction: "+targetCountFraction); 544 return (long)(targetCountFraction*reads); 545 } 546 547 // public static int reduceRoundsOld(final int[] counts, final int minCount, final int targetKeys, final Random randy){ 548 // assert(minCount>=0) : minCount; 549 // int rounds=0; 550 // int valid=0; 551 // for(int x : counts){ 552 // if(x>=minCount){valid++;} 553 // } 554 // 555 // int len=counts.length; 556 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts)); 557 // for(; valid>targetKeys; rounds++){ 558 // int pos=randy.nextInt(len); 559 //// assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts); 560 // if(counts[pos]==minCount){valid--;} 561 // counts[pos]--; 562 // if(counts[pos]==0){ 563 // len--;//shrink the array 564 // System.err.println("len="+len+", counts[len]="+counts[len]); 565 // System.err.println("pos="+pos+", counts[pos]="+counts[pos]); 566 // counts[pos]=counts[len];//move the last element to the empty slot 567 // counts[len]=0; 568 // if(pos!=len && len>0){ 569 // assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts); 570 // } 571 // } 572 // System.err.println(len+", "+pos+": "+Arrays.toString(counts)); 573 // } 574 // 575 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)); 576 // 577 // return rounds; 578 // } 579 580 //This can be done faster with bins. 581 //Each bin contains all kmers with count x. When a bin is hit, one kmer moves to the next bin lower. 582 //Alternately, expand the array into one physical kmer per count. Store the current counts in an IntMap. Remove key each time. 583 public static long reduceRounds(final int[] counts0, final int[] counts, final int minCount, final int targetKeys, final Random randy){ 584 assert(minCount>=0) : minCount; 585 long rounds=0; 586 int valid=0; 587 for(int x : counts){ 588 if(x>=minCount){valid++;} 589 } 590 591 int len=counts.length; 592 final long sum0=Tools.sum(counts); 593 long sum=sum0; 594 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts)); 595 for(; valid>targetKeys; rounds++){ 596 long posNum=(Long.MAX_VALUE&randy.nextLong())%sum; 597 long sum2=0; 598 int pos=0; 599 600 for(int i=0; i<counts.length; i++){ 601 int x=counts[i]; 602 if(x>0){ 603 sum2+=x; 604 if(sum2>=posNum){ 605 pos=i; 606 break; 607 } 608 } 609 } 610 611 // for(int i=0; i<counts0.length; i++){ 612 // int x=counts0[i]; 613 // if(x>0){ 614 // sum2+=x; 615 // if(sum2>=posNum){ 616 // pos=i; 617 // break; 618 // } 619 // } 620 // } 621 622 sum--; 623 624 assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts); 625 if(counts[pos]==minCount){valid--;} 626 counts[pos]--; 627 if(counts[pos]==0){ 628 len--;//shrink the array 629 } 630 // System.err.println(len+", "+pos+": "+Arrays.toString(counts)); 631 } 632 633 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)); 634 635 return rounds; 636 } 637 638 //This can be done faster with bins. 639 //Each bin contains all kmers with count x. When a bin is hit, one kmer moves to the next bin lower. 640 //Alternately, expand the array into one physical kmer per count. Store the current counts in an IntMap. Remove key each time. reduceRoundsIM(final int[] counts0, final int[] expanded, final int minCount, final int targetKeys, final Random randy, final IntMap map)641 public static long reduceRoundsIM(final int[] counts0, final int[] expanded, final int minCount, final int targetKeys, final Random randy, final IntMap map){ 642 assert(minCount>=0) : minCount; 643 long rounds=0; 644 int valid=0; 645 map.clear(); 646 for(int i=0, k=0; i<counts0.length; i++){ 647 int x=counts0[i]; 648 // counts[i]=counts0[i]; 649 if(x>=minCount){valid++;} 650 map.put(i, x); 651 for(int j=0; j<x; j++, k++){ 652 expanded[k]=i; 653 } 654 } 655 assert(expanded.length==Tools.sum(counts0)); 656 657 int len=expanded.length; 658 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts)); 659 for(; valid>targetKeys; rounds++){ 660 final int pos=randy.nextInt(len); 661 final int key=expanded[pos]; 662 final int x=map.get(key); 663 assert(x>0); 664 665 666 if(x==minCount){valid--;} 667 map.put(key, x-1); 668 669 len--;//shrink the array 670 // System.err.println("len="+len+", counts[len]="+counts[len]); 671 // System.err.println("pos="+pos+", counts[pos]="+counts[pos]); 672 expanded[pos]=expanded[len];//move the last element to the empty slot 673 expanded[len]=0; 674 675 // System.err.println(len+", "+pos+": "+Arrays.toString(counts)); 676 } 677 678 // System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)); 679 680 return rounds; 681 } 682 683 /*--------------------------------------------------------------*/ 684 /*---------------- Inner Classes ----------------*/ 685 /*--------------------------------------------------------------*/ 686 687 /** This class is static to prevent accidental writing to shared variables. 688 * It is safe to remove the static modifier. */ 689 private class ProcessThread extends Thread { 690 691 //Constructor ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size)692 ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){ 693 cris=cris_; 694 ros=ros_; 695 tid=tid_; 696 localHeap=new SketchHeap(size, 0, true); 697 } 698 699 //Called by start() 700 @Override run()701 public void run(){ 702 //Do anything necessary prior to processing 703 704 //Process the reads 705 processInner(); 706 707 //Do anything necessary after processing 708 dumpHeap(); 709 710 //Indicate successful exit status 711 success=true; 712 } 713 714 /** Iterate through the reads */ processInner()715 void processInner(){ 716 717 //Grab the first ListNum of reads 718 ListNum<Read> ln=cris.nextList(); 719 //Grab the actual read list from the ListNum 720 ArrayList<Read> reads=(ln!=null ? ln.list : null); 721 722 //Check to ensure pairing is as expected 723 if(reads!=null && !reads.isEmpty()){ 724 Read r=reads.get(0); 725 // assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access 726 } 727 728 //As long as there is a nonempty read list... 729 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 730 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access 731 732 //Loop through each read in the list 733 for(int idx=0; idx<reads.size(); idx++){ 734 final Read r1=reads.get(idx); 735 final Read r2=r1.mate; 736 737 //Validate reads in worker threads 738 if(!r1.validated()){r1.validate(true);} 739 if(r2!=null && !r2.validated()){r2.validate(true);} 740 741 //Track the initial length for statistics 742 final int initialLength1=r1.length(); 743 final int initialLength2=r1.mateLength(); 744 745 //Increment counters 746 readsProcessedT+=r1.pairCount(); 747 basesProcessedT+=initialLength1+initialLength2; 748 749 //Reads are processed in this block. 750 processReadPair(r1, r2); 751 } 752 753 if(ros!=null){ 754 for(Read r1 : reads){ 755 readsOutT+=r1.pairCount(); 756 basesOutT+=r1.pairLength(); 757 } 758 759 //Output reads to the output stream 760 if(ros!=null){ros.add(reads, ln.id);} 761 } 762 763 //Notify the input stream that the list was used 764 cris.returnList(ln); 765 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access 766 767 //Fetch a new list 768 ln=cris.nextList(); 769 reads=(ln!=null ? ln.list : null); 770 } 771 772 //Notify the input stream that the final list was used 773 if(ln!=null){ 774 if(ln.list!=null){ln.list.clear();} 775 cris.returnList(ln.id, true); 776 } 777 } 778 779 /** 780 * Process a read or a read pair. 781 * @param r1 Read 1 782 * @param r2 Read 2 (may be null) 783 */ processReadPair(final Read r1, final Read r2)784 void processReadPair(final Read r1, final Read r2){ 785 processReadNucleotide(r1); 786 if(r2!=null){processReadNucleotide(r2);} 787 } 788 processReadNucleotide(final Read r)789 void processReadNucleotide(final Read r){ 790 final byte[] bases=r.bases; 791 final byte[] quals=r.quality; 792 long kmer=0; 793 long rkmer=0; 794 int len=0; 795 assert(!r.aminoacid()); 796 797 final long min=minHashValue; 798 localHeap.genomeSizeBases+=r.length(); 799 localHeap.genomeSequences++; 800 801 if(quals==null || (minProb<=0 && minQual<2)){ 802 for(int i=0; i<bases.length; i++){ 803 byte b=bases[i]; 804 long x=AminoAcid.baseToNumber[b]; 805 long x2=AminoAcid.baseToComplementNumber[b]; 806 807 kmer=((kmer<<2)|x)&mask; 808 rkmer=((rkmer>>>2)|(x2<<shift2))&mask; 809 810 if(x<0){len=0; rkmer=0;}else{len++;} 811 if(len>=k){ 812 localHeap.genomeSizeKmers++; 813 final long hashcode=hash(kmer, rkmer); 814 if(hashcode>min){localHeap.add(hashcode);} 815 } 816 } 817 }else{ 818 float prob=1; 819 for(int i=0; i<bases.length; i++){ 820 final byte b=bases[i]; 821 final long x=AminoAcid.baseToNumber[b]; 822 final long x2=AminoAcid.baseToComplementNumber[b]; 823 824 {//Quality-related stuff 825 final byte q=quals[i]; 826 assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual; 827 prob=prob*align2.QualityTools.PROB_CORRECT[q]; 828 if(len>k){ 829 byte oldq=quals[i-k]; 830 prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq]; 831 } 832 if(x<0 || q<minQual){ 833 len=0; 834 kmer=rkmer=0; 835 prob=1; 836 }else{ 837 len++; 838 } 839 } 840 841 kmer=((kmer<<2)|x)&mask; 842 rkmer=((rkmer>>>2)|(x2<<shift2))&mask; 843 844 if(len>=k && prob>=minProb){ 845 localHeap.genomeSizeKmers++; 846 localHeap.probSum+=prob; 847 final long hashcode=hash(kmer, rkmer); 848 if(hashcode>min){localHeap.checkAndAdd(hashcode);} 849 } 850 } 851 } 852 } 853 dumpHeap()854 private void dumpHeap(){ 855 synchronized(sharedHeap){ 856 sharedHeap.add(localHeap); 857 } 858 } 859 860 /** Number of reads processed by this thread */ 861 protected long readsProcessedT=0; 862 /** Number of bases processed by this thread */ 863 protected long basesProcessedT=0; 864 865 /** Number of reads retained by this thread */ 866 protected long readsOutT=0; 867 /** Number of bases retained by this thread */ 868 protected long basesOutT=0; 869 870 /** True only if this thread has completed successfully */ 871 boolean success=false; 872 873 /** Shared input stream */ 874 private final ConcurrentReadInputStream cris; 875 /** Shared output stream */ 876 private final ConcurrentReadOutputStream ros; 877 /** Thread ID */ 878 final int tid; 879 880 final SketchHeap localHeap; 881 } 882 883 /*--------------------------------------------------------------*/ 884 /*---------------- Fields ----------------*/ 885 /*--------------------------------------------------------------*/ 886 887 /** Primary input file path */ 888 private String in1=null; 889 /** Secondary input file path */ 890 private String in2=null; 891 892 private String qfin1=null; 893 private String qfin2=null; 894 895 /** Primary output file path */ 896 private String out1=null; 897 /** Secondary output file path */ 898 private String out2=null; 899 900 private String qfout1=null; 901 private String qfout2=null; 902 903 /** Override input file extension */ 904 private String extin=null; 905 /** Override output file extension */ 906 private String extout=null; 907 908 /*--------------------------------------------------------------*/ 909 910 /** Number of reads processed */ 911 protected long readsProcessed=0; 912 /** Number of bases processed */ 913 protected long basesProcessed=0; 914 915 /** Number of reads retained */ 916 protected long readsOut=0; 917 /** Number of bases retained */ 918 protected long basesOut=0; 919 920 /** Quit after processing this many input reads; -1 means no limit */ 921 private long maxReads=-1; 922 923 private boolean paired=false; 924 private int trials=25; 925 private long seed=-1; 926 private int maxExpandedLength=50000000; 927 928 /*--------------------------------------------------------------*/ 929 /*---------------- Final Fields ----------------*/ 930 /*--------------------------------------------------------------*/ 931 932 /** Primary input file */ 933 private final FileFormat ffin1; 934 /** Secondary input file */ 935 private final FileFormat ffin2; 936 937 /** Primary output file */ 938 private final FileFormat ffout1; 939 /** Secondary output file */ 940 private final FileFormat ffout2; 941 942 private final SketchHeap sharedHeap; 943 private final int heapSize; 944 private final long targetKmers; 945 private final int minCount; 946 947 final int shift; 948 final int shift2; 949 final long mask; 950 951 final float minProb; 952 final byte minQual; 953 954 /*--------------------------------------------------------------*/ 955 /*---------------- Common Fields ----------------*/ 956 /*--------------------------------------------------------------*/ 957 958 /** Print status messages to this output stream */ 959 private PrintStream outstream=System.err; 960 /** Print verbose messages */ 961 public static boolean verbose=false; 962 /** True if an error was encountered */ 963 public boolean errorState=false; 964 /** Overwrite existing output files */ 965 private boolean overwrite=true; 966 /** Append to existing output files */ 967 private boolean append=false; 968 /** Reads are output in input order (not enabled) */ 969 private boolean ordered=true; 970 971 } 972