1 package sketch;
2 
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Arrays;
7 import java.util.Locale;
8 import java.util.Random;
9 
10 import dna.AminoAcid;
11 import fileIO.ByteFile;
12 import fileIO.FileFormat;
13 import fileIO.ReadWrite;
14 import shared.Parse;
15 import shared.Parser;
16 import shared.PreParser;
17 import shared.ReadStats;
18 import shared.Shared;
19 import shared.Timer;
20 import shared.Tools;
21 import stream.ConcurrentReadInputStream;
22 import stream.ConcurrentReadOutputStream;
23 import stream.FASTQ;
24 import stream.FastaReadInputStream;
25 import stream.Read;
26 import structures.IntMap;
27 import structures.ListNum;
28 
29 /**
30  *
31  * @author Brian Bushnell
32  * @date July 30, 2018
33  *
34  */
35 public class KmerLimit2 extends SketchObject {
36 
37 	/*--------------------------------------------------------------*/
38 	/*----------------        Initialization        ----------------*/
39 	/*--------------------------------------------------------------*/
40 
41 	/**
42 	 * Code entrance from the command line.
43 	 * @param args Command line arguments
44 	 */
main(String[] args)45 	public static void main(String[] args){
46 		//Start a timer immediately upon code entrance.
47 		Timer t=new Timer();
48 
49 		//Create an instance of this class
50 		KmerLimit2 x=new KmerLimit2(args);
51 
52 		//Run the object
53 		x.process(t);
54 
55 		//Close the print stream if it was redirected
56 		Shared.closeStream(x.outstream);
57 	}
58 
59 	/**
60 	 * Constructor.
61 	 * @param args Command line arguments
62 	 */
KmerLimit2(String[] args)63 	public KmerLimit2(String[] args){
64 
65 		{//Preparse block for help, config files, and outstream
66 			PreParser pp=new PreParser(args, getClass(), false);
67 			args=pp.args;
68 			outstream=pp.outstream;
69 		}
70 
71 		boolean setInterleaved=false; //Whether interleaved was explicitly set.
72 
73 		//Set shared static variables
74 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
75 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
76 		SketchObject.setKeyFraction(0.1);
77 		defaultParams.minEntropy=0;
78 		defaultParams.minProb=0.2f;
79 
80 		boolean setHeapSize=false;
81 		int heapSize_=8091;
82 		long targetKmers_=0;
83 		int k_=32;
84 		int minCount_=1;
85 
86 		//Create a parser object
87 		Parser parser=new Parser();
88 		parser.overwrite=true;
89 
90 		//Parse each argument
91 		for(int i=0; i<args.length; i++){
92 			String arg=args[i];
93 
94 			//Break arguments into their constituent parts, in the form of "a=b"
95 			String[] split=arg.split("=");
96 			String a=split[0].toLowerCase();
97 			String b=split.length>1 ? split[1] : null;
98 			if(b!=null && b.equalsIgnoreCase("null")){b=null;}
99 
100 			if(a.equals("verbose")){
101 				verbose=Parse.parseBoolean(b);
102 			}else if(a.equals("ordered")){
103 				ordered=Parse.parseBoolean(b);
104 			}else if(a.equals("size") || a.equals("heapsize")){
105 				heapSize_=Parse.parseIntKMG(b);
106 				setHeapSize=true;
107 			}else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){
108 				targetKmers_=Parse.parseKMG(b);
109 			}else if(a.equals("mincount")){
110 				minCount_=Parse.parseIntKMG(b);
111 			}else if(a.equals("maxexpandedlength") || a.equals("maxlength") || a.equals("maxlen")){
112 				maxExpandedLength=Parse.parseIntKMG(b);
113 			}else if(a.equals("seed")){
114 				seed=Parse.parseKMG(b);
115 			}else if(a.equals("trials")){
116 				trials=Parse.parseIntKMG(b);
117 			}else if(parseSketchFlags(arg, a, b)){
118 				parser.parse(arg, a, b);
119 			}else if(defaultParams.parse(arg, a, b)){
120 				parser.parse(arg, a, b);
121 			}else if(a.equals("parse_flag_goes_here")){
122 				long fake_variable=Parse.parseKMG(b);
123 				//Set a variable here
124 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
125 				//do nothing
126 			}else{
127 				outstream.println("Unknown parameter "+args[i]);
128 				assert(false) : "Unknown parameter "+args[i];
129 			}
130 		}
131 
132 		if(!setHeapSize && minCount_>1){heapSize_=32000;}
133 		heapSize=heapSize_;
134 		targetKmers=targetKmers_;
135 		k=k_;
136 		minCount=minCount_;
137 		assert(targetKmers>0) : "Must set a kmer limit.";
138 		assert(heapSize>0) : "Heap size must be positive.";
139 		assert(k>0 && k<=32) : "0<k<33; k="+k;
140 		postParse();
141 
142 //		if(minCount>1){
143 //			Shared.setBufferLen(800);
144 //		}
145 
146 		{//Process parser fields
147 			Parser.processQuality();
148 
149 			maxReads=parser.maxReads;
150 
151 			overwrite=ReadStats.overwrite=parser.overwrite;
152 			append=ReadStats.append=parser.append;
153 			setInterleaved=parser.setInterleaved;
154 
155 			in1=parser.in1;
156 			in2=parser.in2;
157 			qfin1=parser.qfin1;
158 			qfin2=parser.qfin2;
159 
160 			out1=parser.out1;
161 			out2=parser.out2;
162 			qfout1=parser.qfout1;
163 			qfout2=parser.qfout2;
164 
165 			extin=parser.extin;
166 			extout=parser.extout;
167 		}
168 
169 		//Do input file # replacement
170 		if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
171 			in2=in1.replace("#", "2");
172 			in1=in1.replace("#", "1");
173 		}
174 
175 		//Do output file # replacement
176 		if(out1!=null && out2==null && out1.indexOf('#')>-1){
177 			out2=out1.replace("#", "2");
178 			out1=out1.replace("#", "1");
179 		}
180 
181 		//Adjust interleaved detection based on the number of input files
182 		if(in2!=null){
183 			if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
184 			FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
185 		}
186 
187 		assert(FastaReadInputStream.settingsOK());
188 
189 		//Ensure there is an input file
190 		if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
191 
192 		//Adjust the number of threads for input file reading
193 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
194 			ByteFile.FORCE_MODE_BF2=true;
195 		}
196 
197 		//Ensure out2 is not set without out1
198 		if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
199 
200 		//Adjust interleaved settings based on number of output files
201 		if(!setInterleaved){
202 			assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
203 			if(in2!=null){ //If there are 2 input streams.
204 				FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
205 				outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
206 			}else{ //There is one input stream.
207 				if(out2!=null){
208 					FASTQ.FORCE_INTERLEAVED=true;
209 					FASTQ.TEST_INTERLEAVED=false;
210 					outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
211 				}
212 			}
213 		}
214 
215 		//Ensure output files can be written
216 		if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
217 			outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
218 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
219 		}
220 
221 		//Ensure input files can be read
222 		if(!Tools.testInputFiles(false, true, in1, in2)){
223 			throw new RuntimeException("\nCan't read some input files.\n");
224 		}
225 
226 		//Ensure that no file was specified multiple times
227 		if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
228 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
229 		}
230 
231 		//Create output FileFormat objects
232 		ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
233 		ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
234 
235 		//Create input FileFormat objects
236 		ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
237 		ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
238 
239 		minProb=defaultParams.minProb;
240 		minQual=defaultParams.minQual;
241 
242 		shift=2*k;
243 		shift2=shift-2;
244 		mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32
245 		sharedHeap=new SketchHeap(heapSize, 0, true);
246 	}
247 
248 	/*--------------------------------------------------------------*/
249 	/*----------------         Outer Methods        ----------------*/
250 	/*--------------------------------------------------------------*/
251 
252 	/** Create read streams and process all data */
process(Timer t)253 	void process(Timer t){
254 
255 		//Turn off read validation in the input threads to increase speed
256 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
257 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
258 
259 //		//Optionally create a read output stream
260 //		final ConcurrentReadOutputStream ros;
261 //		if(ffout1!=null){
262 //			//Select output buffer size based on whether it needs to be ordered
263 //			final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
264 //
265 //			//Notify user of output mode
266 //			if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
267 //				outstream.println("Writing interleaved.");
268 //			}
269 //
270 //			ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
271 //			ros.start(); //Start the stream
272 //		}else{ros=null;}
273 
274 		//Reset counters
275 		readsProcessed=readsOut=0;
276 		basesProcessed=basesOut=0;
277 
278 		//Process the reads in separate threads
279 		spawnThreads0();
280 
281 //		if(verbose){outstream.println("Finished; closing streams.");}
282 
283 		//Reset read validation
284 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
285 
286 
287 		Sketch sketch=new Sketch(sharedHeap, true, true, null);
288 		sketch=capLengthAtCountSum(sketch, maxExpandedLength);
289 		final long reads=Tools.max(1, sketch.genomeSequences);
290 		final long targetReads=calcTargetReads(sketch, targetKmers, minCount, trials, seed);
291 		final double targetRate=Tools.min(1, targetReads/(double)reads);
292 		final String targetRateS=String.format(Locale.ROOT, "%.4f%%",targetRate*100);
293 
294 		//Report timing and results
295 		t.stop();
296 		outstream.println("Finished counting kmers.");
297 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
298 
299 		String kstring0=Tools.padKM(sketch.genomeSizeEstimate(minCount), 8);
300 		String rstring0=Tools.padKM(targetReads, 8);
301 		outstream.println("Unique Kmers:       "+kstring0);
302 		outstream.println("Target Reads:       "+rstring0+"\t"+targetRateS);
303 
304 //		outstream.println("Reads:        \t"+reads);
305 //		outstream.println("Unique Kmers: \t"+sketch.genomeSizeEstimate(minCount));
306 //		outstream.println("Target Reads: \t"+targetReads);
307 //		outstream.println("Sample Rate:  \t"+targetRateS);
308 //		outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
309 
310 		t.start();
311 		outstream.println("\nSubsampling reads.");
312 
313 //		String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
314 //		outstream.println("Unique Kmers Out:   "+kstring);
315 
316 
317 //		ArrayList<String> args=new ArrayList<String>();
318 //		args.add("in="+in1);
319 //		if(in2!=null){args.add("in2="+in2);}
320 //		args.add("out="+out1);
321 //		if(out2!=null){args.add("out2="+out2);}
322 //		args.add("ordered="+ordered);
323 //		args.add("ow="+(overwrite ? "t" : "f"));
324 //		if(targetRate<1){args.add("samplerate="+targetRateS);}
325 //		args.add("loglogout");
326 //		args.add("loglogk="+k);
327 //		args.add("loglogminprob="+minProb);
328 //		BBDukF.main(args.toArray(new String[0]));
329 
330 //		Sketch sk=new Sketch(sharedHeap, true, true, null);
331 //		outstream.println(sk.genomeSizeEstimate());
332 		spawnThreads2(targetRate);
333 		t.stop();
334 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
335 
336 		outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
337 		String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
338 		outstream.println("Unique Kmers Out:   "+kstring);
339 
340 		//Throw an exception of there was an error in a thread
341 		if(errorState){
342 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
343 		}
344 	}
345 
346 	/** Spawn process threads */
347 	private void spawnThreads0(){
348 
349 		//Create a read input stream
350 		final ConcurrentReadInputStream cris;
351 		{
352 			cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
353 			cris.start(); //Start the stream
354 			if(verbose){outstream.println("Started cris");}
355 		}
356 		paired=cris.paired();
357 		if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
358 
359 		//Determine how many threads may be used
360 		final int threads=Tools.min(10, Shared.threads());
361 
362 		//Fill a list with ProcessThreads
363 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
364 		for(int i=0; i<threads; i++){
365 			alpt.add(new ProcessThread(cris, null, i, heapSize));
366 		}
367 
368 		//Start the threads
369 		for(ProcessThread pt : alpt){
370 			pt.start();
371 		}
372 
373 		//Wait for completion of all threads
374 		boolean success=true;
375 		for(ProcessThread pt : alpt){
376 
377 			//Wait until this thread has terminated
378 			while(pt.getState()!=Thread.State.TERMINATED){
379 				try {
380 					//Attempt a join operation
381 					pt.join();
382 				} catch (InterruptedException e) {
383 					//Potentially handle this, if it is expected to occur
384 					e.printStackTrace();
385 				}
386 			}
387 
388 			//Accumulate per-thread statistics
389 			readsProcessed+=pt.readsProcessedT;
390 			basesProcessed+=pt.basesProcessedT;
391 			readsOut+=pt.readsOutT;
392 			basesOut+=pt.basesOutT;
393 			success&=pt.success;
394 		}
395 
396 		//Track whether any threads failed
397 		if(!success){errorState=true;}
398 
399 		//Do anything necessary after processing
400 
401 		//Close the read streams
402 		errorState|=ReadWrite.closeStreams(cris);
403 
404 	}
405 
406 	/** Spawn process threads */
407 	private void spawnThreads2(double rate){
408 
409 		//Create a read input stream
410 		final ConcurrentReadInputStream cris;
411 		{
412 			cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
413 			cris.setSampleRate((float)rate, seed);
414 			cris.start(); //Start the stream
415 			if(verbose){outstream.println("Started cris");}
416 		}
417 //		paired=cris.paired();
418 //		if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
419 
420 		//Optionally create a read output stream
421 		final ConcurrentReadOutputStream ros;
422 		if(ffout1!=null){
423 			//Select output buffer size based on whether it needs to be ordered
424 			final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
425 
426 			//Notify user of output mode
427 			if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
428 				outstream.println("Writing interleaved.");
429 			}
430 
431 			ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
432 			ros.start(); //Start the stream
433 		}else{ros=null;}
434 
435 		//Determine how many threads may be used
436 		final int threads=Tools.min(10, Shared.threads());
437 
438 		sharedHeap.clear();
439 //		readsProcessed=0;
440 //		basesProcessed=0;
441 		readsOut=0;
442 		basesOut=0;
443 
444 		//Fill a list with ProcessThreads
445 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
446 		for(int i=0; i<threads; i++){
447 			alpt.add(new ProcessThread(cris, ros, i, heapSize));
448 		}
449 
450 		//Start the threads
451 		for(ProcessThread pt : alpt){
452 			pt.start();
453 		}
454 
455 		//Wait for completion of all threads
456 		boolean success=true;
457 		for(ProcessThread pt : alpt){
458 
459 			//Wait until this thread has terminated
460 			while(pt.getState()!=Thread.State.TERMINATED){
461 				try {
462 					//Attempt a join operation
463 					pt.join();
464 				} catch (InterruptedException e) {
465 					//Potentially handle this, if it is expected to occur
466 					e.printStackTrace();
467 				}
468 			}
469 
470 			//Accumulate per-thread statistics
471 //			readsProcessed+=pt.readsProcessedT;
472 //			basesProcessed+=pt.basesProcessedT;
473 			readsOut+=pt.readsOutT;
474 			basesOut+=pt.basesOutT;
475 			success&=pt.success;
476 		}
477 
478 		//Track whether any threads failed
479 		if(!success){errorState=true;}
480 
481 		//Do anything necessary after processing
482 
483 		//Write anything that was accumulated by ReadStats
484 		errorState|=ReadStats.writeAll();
485 		//Close the read streams
486 		errorState|=ReadWrite.closeStreams(cris, ros);
487 
488 	}
489 
490 	/*--------------------------------------------------------------*/
491 	/*----------------         Inner Methods        ----------------*/
492 	/*--------------------------------------------------------------*/
493 
494 	public static Sketch capLengthAtCountSum(Sketch sketch0, int max) {
495 		int len=0;
496 		long sum=0;
497 		for(; len<sketch0.keyCounts.length; len++){
498 			sum=sum+sketch0.keyCounts[len];
499 			if(sum>max){break;}
500 		}
501 		if(len>=sketch0.length()){return sketch0;}
502 
503 		long[] keys=Arrays.copyOf(sketch0.keys, len);
504 		int[] counts=Arrays.copyOf(sketch0.keyCounts, len);
505 
506 //		long[] array_, int[] counts_, int taxID_, long imgID_, long gSizeBases_, long gSizeKmers_, long gSequences_, double probCorrect_,
507 //		String taxName_, String name0_, String fname_, ArrayList<String> meta_
508 
509 		Sketch sk=new Sketch(keys, counts, null, null, null, -1, -1,
510 				sketch0.genomeSizeBases, sketch0.genomeSizeKmers, sketch0.genomeSequences, sketch0.probCorrect,
511 				null, null, null, null);
512 
513 		return sk;
514 	}
515 
516 	public static long calcTargetReads(Sketch sketch, long targetKmers, int minCount, int trials, long seed){
517 		final int[] counts0=sketch.keyCounts;
518 		final int[] counts=Arrays.copyOf(counts0, counts0.length);
519 		final long size=sketch.genomeSizeEstimate(minCount);
520 		final long reads=sketch.genomeSequences;
521 		final double targetKmerFraction=targetKmers/(double)size;
522 		if(targetKmerFraction>=1){return reads;}
523 
524 		final int targetKeys=(int)(targetKmerFraction*counts.length);
525 		final long countSum=Tools.sum(counts0);
526 		assert(countSum<Shared.MAX_ARRAY_LEN) : countSum;
527 //		System.err.println("countsum: "+countSum);
528 
529 		final IntMap map=new IntMap(0, counts0.length);
530 		final int[] expanded=new int[(int)countSum];
531 
532 		long roundSum=0;
533 		final Random randy=Shared.threadLocalRandom(seed);
534 		for(int i=0; i<trials; i++){
535 			Tools.fill(counts, counts0);
536 //			long rounds=reduceRounds(counts0, counts, minCount, targetKeys, randy);
537 			long rounds=reduceRoundsIM(counts0, expanded, minCount, targetKeys, randy, map);
538 			roundSum+=rounds;
539 		}
540 		double avgRounds=roundSum/(double)trials;
541 //		System.err.println("avgRounds: "+avgRounds);
542 		double targetCountFraction=1-(avgRounds/countSum);
543 //		System.err.println("targetFraction: "+targetCountFraction);
544 		return (long)(targetCountFraction*reads);
545 	}
546 
547 //	public static int reduceRoundsOld(final int[] counts, final int minCount, final int targetKeys, final Random randy){
548 //		assert(minCount>=0) : minCount;
549 //		int rounds=0;
550 //		int valid=0;
551 //		for(int x : counts){
552 //			if(x>=minCount){valid++;}
553 //		}
554 //
555 //		int len=counts.length;
556 //		System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
557 //		for(; valid>targetKeys; rounds++){
558 //			int pos=randy.nextInt(len);
559 ////			assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
560 //			if(counts[pos]==minCount){valid--;}
561 //			counts[pos]--;
562 //			if(counts[pos]==0){
563 //				len--;//shrink the array
564 //				System.err.println("len="+len+", counts[len]="+counts[len]);
565 //				System.err.println("pos="+pos+", counts[pos]="+counts[pos]);
566 //				counts[pos]=counts[len];//move the last element to the empty slot
567 //				counts[len]=0;
568 //				if(pos!=len && len>0){
569 //					assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
570 //				}
571 //			}
572 //			System.err.println(len+", "+pos+": "+Arrays.toString(counts));
573 //		}
574 //
575 //		System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
576 //
577 //		return rounds;
578 //	}
579 
580 	//This can be done faster with bins.
581 	//Each bin contains all kmers with count x.  When a bin is hit, one kmer moves to the next bin lower.
582 	//Alternately, expand the array into one physical kmer per count.  Store the current counts in an IntMap. Remove key each time.
583 	public static long reduceRounds(final int[] counts0, final int[] counts, final int minCount, final int targetKeys, final Random randy){
584 		assert(minCount>=0) : minCount;
585 		long rounds=0;
586 		int valid=0;
587 		for(int x : counts){
588 			if(x>=minCount){valid++;}
589 		}
590 
591 		int len=counts.length;
592 		final long sum0=Tools.sum(counts);
593 		long sum=sum0;
594 //		System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
595 		for(; valid>targetKeys; rounds++){
596 			long posNum=(Long.MAX_VALUE&randy.nextLong())%sum;
597 			long sum2=0;
598 			int pos=0;
599 
600 			for(int i=0; i<counts.length; i++){
601 				int x=counts[i];
602 				if(x>0){
603 					sum2+=x;
604 					if(sum2>=posNum){
605 						pos=i;
606 						break;
607 					}
608 				}
609 			}
610 
611 //			for(int i=0; i<counts0.length; i++){
612 //				int x=counts0[i];
613 //				if(x>0){
614 //					sum2+=x;
615 //					if(sum2>=posNum){
616 //						pos=i;
617 //						break;
618 //					}
619 //				}
620 //			}
621 
622 			sum--;
623 
624 			assert(counts[pos]>0) : pos+"/"+len+": "+targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Arrays.toString(counts);
625 			if(counts[pos]==minCount){valid--;}
626 			counts[pos]--;
627 			if(counts[pos]==0){
628 				len--;//shrink the array
629 			}
630 //			System.err.println(len+", "+pos+": "+Arrays.toString(counts));
631 		}
632 
633 //		System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
634 
635 		return rounds;
636 	}
637 
638 	//This can be done faster with bins.
639 	//Each bin contains all kmers with count x.  When a bin is hit, one kmer moves to the next bin lower.
640 	//Alternately, expand the array into one physical kmer per count.  Store the current counts in an IntMap. Remove key each time.
reduceRoundsIM(final int[] counts0, final int[] expanded, final int minCount, final int targetKeys, final Random randy, final IntMap map)641 	public static long reduceRoundsIM(final int[] counts0, final int[] expanded, final int minCount, final int targetKeys, final Random randy, final IntMap map){
642 		assert(minCount>=0) : minCount;
643 		long rounds=0;
644 		int valid=0;
645 		map.clear();
646 		for(int i=0, k=0; i<counts0.length; i++){
647 			int x=counts0[i];
648 //			counts[i]=counts0[i];
649 			if(x>=minCount){valid++;}
650 			map.put(i, x);
651 			for(int j=0; j<x; j++, k++){
652 				expanded[k]=i;
653 			}
654 		}
655 		assert(expanded.length==Tools.sum(counts0));
656 
657 		int len=expanded.length;
658 //		System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts)+", "+Arrays.toString(counts));
659 		for(; valid>targetKeys; rounds++){
660 			final int pos=randy.nextInt(len);
661 			final int key=expanded[pos];
662 			final int x=map.get(key);
663 			assert(x>0);
664 
665 
666 			if(x==minCount){valid--;}
667 			map.put(key, x-1);
668 
669 			len--;//shrink the array
670 			//				System.err.println("len="+len+", counts[len]="+counts[len]);
671 			//				System.err.println("pos="+pos+", counts[pos]="+counts[pos]);
672 			expanded[pos]=expanded[len];//move the last element to the empty slot
673 			expanded[len]=0;
674 
675 //			System.err.println(len+", "+pos+": "+Arrays.toString(counts));
676 		}
677 
678 //		System.err.println(targetKeys+", "+counts.length+", "+valid+", "+len+", "+rounds+", "+Tools.sum(counts));
679 
680 		return rounds;
681 	}
682 
683 	/*--------------------------------------------------------------*/
684 	/*----------------         Inner Classes        ----------------*/
685 	/*--------------------------------------------------------------*/
686 
687 	/** This class is static to prevent accidental writing to shared variables.
688 	 * It is safe to remove the static modifier. */
689 	private class ProcessThread extends Thread {
690 
691 		//Constructor
ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size)692 		ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){
693 			cris=cris_;
694 			ros=ros_;
695 			tid=tid_;
696 			localHeap=new SketchHeap(size, 0, true);
697 		}
698 
699 		//Called by start()
700 		@Override
run()701 		public void run(){
702 			//Do anything necessary prior to processing
703 
704 			//Process the reads
705 			processInner();
706 
707 			//Do anything necessary after processing
708 			dumpHeap();
709 
710 			//Indicate successful exit status
711 			success=true;
712 		}
713 
714 		/** Iterate through the reads */
processInner()715 		void processInner(){
716 
717 			//Grab the first ListNum of reads
718 			ListNum<Read> ln=cris.nextList();
719 			//Grab the actual read list from the ListNum
720 			ArrayList<Read> reads=(ln!=null ? ln.list : null);
721 
722 			//Check to ensure pairing is as expected
723 			if(reads!=null && !reads.isEmpty()){
724 				Read r=reads.get(0);
725 //				assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
726 			}
727 
728 			//As long as there is a nonempty read list...
729 			while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
730 //				if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
731 
732 				//Loop through each read in the list
733 				for(int idx=0; idx<reads.size(); idx++){
734 					final Read r1=reads.get(idx);
735 					final Read r2=r1.mate;
736 
737 					//Validate reads in worker threads
738 					if(!r1.validated()){r1.validate(true);}
739 					if(r2!=null && !r2.validated()){r2.validate(true);}
740 
741 					//Track the initial length for statistics
742 					final int initialLength1=r1.length();
743 					final int initialLength2=r1.mateLength();
744 
745 					//Increment counters
746 					readsProcessedT+=r1.pairCount();
747 					basesProcessedT+=initialLength1+initialLength2;
748 
749 					//Reads are processed in this block.
750 					processReadPair(r1, r2);
751 				}
752 
753 				if(ros!=null){
754 					for(Read r1 : reads){
755 						readsOutT+=r1.pairCount();
756 						basesOutT+=r1.pairLength();
757 					}
758 
759 					//Output reads to the output stream
760 					if(ros!=null){ros.add(reads, ln.id);}
761 				}
762 
763 				//Notify the input stream that the list was used
764 				cris.returnList(ln);
765 //				if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
766 
767 				//Fetch a new list
768 				ln=cris.nextList();
769 				reads=(ln!=null ? ln.list : null);
770 			}
771 
772 			//Notify the input stream that the final list was used
773 			if(ln!=null){
774 				if(ln.list!=null){ln.list.clear();}
775 				cris.returnList(ln.id, true);
776 			}
777 		}
778 
779 		/**
780 		 * Process a read or a read pair.
781 		 * @param r1 Read 1
782 		 * @param r2 Read 2 (may be null)
783 		 */
processReadPair(final Read r1, final Read r2)784 		void processReadPair(final Read r1, final Read r2){
785 			processReadNucleotide(r1);
786 			if(r2!=null){processReadNucleotide(r2);}
787 		}
788 
processReadNucleotide(final Read r)789 		void processReadNucleotide(final Read r){
790 			final byte[] bases=r.bases;
791 			final byte[] quals=r.quality;
792 			long kmer=0;
793 			long rkmer=0;
794 			int len=0;
795 			assert(!r.aminoacid());
796 
797 			final long min=minHashValue;
798 			localHeap.genomeSizeBases+=r.length();
799 			localHeap.genomeSequences++;
800 
801 			if(quals==null || (minProb<=0 && minQual<2)){
802 				for(int i=0; i<bases.length; i++){
803 					byte b=bases[i];
804 					long x=AminoAcid.baseToNumber[b];
805 					long x2=AminoAcid.baseToComplementNumber[b];
806 
807 					kmer=((kmer<<2)|x)&mask;
808 					rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
809 
810 					if(x<0){len=0; rkmer=0;}else{len++;}
811 					if(len>=k){
812 						localHeap.genomeSizeKmers++;
813 						final long hashcode=hash(kmer, rkmer);
814 						if(hashcode>min){localHeap.add(hashcode);}
815 					}
816 				}
817 			}else{
818 				float prob=1;
819 				for(int i=0; i<bases.length; i++){
820 					final byte b=bases[i];
821 					final long x=AminoAcid.baseToNumber[b];
822 					final long x2=AminoAcid.baseToComplementNumber[b];
823 
824 					{//Quality-related stuff
825 						final byte q=quals[i];
826 						assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual;
827 						prob=prob*align2.QualityTools.PROB_CORRECT[q];
828 						if(len>k){
829 							byte oldq=quals[i-k];
830 							prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq];
831 						}
832 						if(x<0 || q<minQual){
833 							len=0;
834 							kmer=rkmer=0;
835 							prob=1;
836 						}else{
837 							len++;
838 						}
839 					}
840 
841 					kmer=((kmer<<2)|x)&mask;
842 					rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
843 
844 					if(len>=k && prob>=minProb){
845 						localHeap.genomeSizeKmers++;
846 						localHeap.probSum+=prob;
847 						final long hashcode=hash(kmer, rkmer);
848 						if(hashcode>min){localHeap.checkAndAdd(hashcode);}
849 					}
850 				}
851 			}
852 		}
853 
dumpHeap()854 		private void dumpHeap(){
855 			synchronized(sharedHeap){
856 				sharedHeap.add(localHeap);
857 			}
858 		}
859 
860 		/** Number of reads processed by this thread */
861 		protected long readsProcessedT=0;
862 		/** Number of bases processed by this thread */
863 		protected long basesProcessedT=0;
864 
865 		/** Number of reads retained by this thread */
866 		protected long readsOutT=0;
867 		/** Number of bases retained by this thread */
868 		protected long basesOutT=0;
869 
870 		/** True only if this thread has completed successfully */
871 		boolean success=false;
872 
873 		/** Shared input stream */
874 		private final ConcurrentReadInputStream cris;
875 		/** Shared output stream */
876 		private final ConcurrentReadOutputStream ros;
877 		/** Thread ID */
878 		final int tid;
879 
880 		final SketchHeap localHeap;
881 	}
882 
883 	/*--------------------------------------------------------------*/
884 	/*----------------            Fields            ----------------*/
885 	/*--------------------------------------------------------------*/
886 
887 	/** Primary input file path */
888 	private String in1=null;
889 	/** Secondary input file path */
890 	private String in2=null;
891 
892 	private String qfin1=null;
893 	private String qfin2=null;
894 
895 	/** Primary output file path */
896 	private String out1=null;
897 	/** Secondary output file path */
898 	private String out2=null;
899 
900 	private String qfout1=null;
901 	private String qfout2=null;
902 
903 	/** Override input file extension */
904 	private String extin=null;
905 	/** Override output file extension */
906 	private String extout=null;
907 
908 	/*--------------------------------------------------------------*/
909 
910 	/** Number of reads processed */
911 	protected long readsProcessed=0;
912 	/** Number of bases processed */
913 	protected long basesProcessed=0;
914 
915 	/** Number of reads retained */
916 	protected long readsOut=0;
917 	/** Number of bases retained */
918 	protected long basesOut=0;
919 
920 	/** Quit after processing this many input reads; -1 means no limit */
921 	private long maxReads=-1;
922 
923 	private boolean paired=false;
924 	private int trials=25;
925 	private long seed=-1;
926 	private int maxExpandedLength=50000000;
927 
928 	/*--------------------------------------------------------------*/
929 	/*----------------         Final Fields         ----------------*/
930 	/*--------------------------------------------------------------*/
931 
932 	/** Primary input file */
933 	private final FileFormat ffin1;
934 	/** Secondary input file */
935 	private final FileFormat ffin2;
936 
937 	/** Primary output file */
938 	private final FileFormat ffout1;
939 	/** Secondary output file */
940 	private final FileFormat ffout2;
941 
942 	private final SketchHeap sharedHeap;
943 	private final int heapSize;
944 	private final long targetKmers;
945 	private final int minCount;
946 
947 	final int shift;
948 	final int shift2;
949 	final long mask;
950 
951 	final float minProb;
952 	final byte minQual;
953 
954 	/*--------------------------------------------------------------*/
955 	/*----------------        Common Fields         ----------------*/
956 	/*--------------------------------------------------------------*/
957 
958 	/** Print status messages to this output stream */
959 	private PrintStream outstream=System.err;
960 	/** Print verbose messages */
961 	public static boolean verbose=false;
962 	/** True if an error was encountered */
963 	public boolean errorState=false;
964 	/** Overwrite existing output files */
965 	private boolean overwrite=true;
966 	/** Append to existing output files */
967 	private boolean append=false;
968 	/** Reads are output in input order (not enabled) */
969 	private boolean ordered=true;
970 
971 }
972