1 package sketch;
2 
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Arrays;
7 
8 import dna.AminoAcid;
9 import fileIO.ByteFile;
10 import fileIO.FileFormat;
11 import fileIO.ReadWrite;
12 import shared.Parse;
13 import shared.Parser;
14 import shared.PreParser;
15 import shared.ReadStats;
16 import shared.Shared;
17 import shared.Timer;
18 import shared.Tools;
19 import stream.ConcurrentReadInputStream;
20 import stream.ConcurrentReadOutputStream;
21 import stream.FASTQ;
22 import stream.FastaReadInputStream;
23 import stream.Read;
24 import structures.ListNum;
25 
26 /**
27  *
28  * @author Brian Bushnell
29  * @date July 25, 2018
30  *
31  */
32 public class KmerLimit extends SketchObject {
33 
34 	/*--------------------------------------------------------------*/
35 	/*----------------        Initialization        ----------------*/
36 	/*--------------------------------------------------------------*/
37 
38 	/**
39 	 * Code entrance from the command line.
40 	 * @param args Command line arguments
41 	 */
main(String[] args)42 	public static void main(String[] args){
43 		//Start a timer immediately upon code entrance.
44 		Timer t=new Timer();
45 
46 		//Create an instance of this class
47 		KmerLimit x=new KmerLimit(args);
48 
49 		//Run the object
50 		x.process(t);
51 
52 		//Close the print stream if it was redirected
53 		Shared.closeStream(x.outstream);
54 	}
55 
56 	/**
57 	 * Constructor.
58 	 * @param args Command line arguments
59 	 */
KmerLimit(String[] args)60 	public KmerLimit(String[] args){
61 
62 		{//Preparse block for help, config files, and outstream
63 			PreParser pp=new PreParser(args, getClass(), false);
64 			args=pp.args;
65 			outstream=pp.outstream;
66 		}
67 
68 		boolean setInterleaved=false; //Whether interleaved was explicitly set.
69 
70 		//Set shared static variables
71 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
72 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
73 		SketchObject.setKeyFraction(0.1);
74 		defaultParams.minEntropy=0;
75 		defaultParams.minProb=0.2f;
76 
77 		boolean setHeapSize=false;
78 		int heapSize_=4095;
79 		long targetKmers_=0;
80 		int k_=32;
81 		int minCount_=1;
82 
83 		//Create a parser object
84 		Parser parser=new Parser();
85 		parser.overwrite=true;
86 
87 		//Parse each argument
88 		for(int i=0; i<args.length; i++){
89 			String arg=args[i];
90 
91 			//Break arguments into their constituent parts, in the form of "a=b"
92 			String[] split=arg.split("=");
93 			String a=split[0].toLowerCase();
94 			String b=split.length>1 ? split[1] : null;
95 			if(b!=null && b.equalsIgnoreCase("null")){b=null;}
96 
97 			if(a.equals("verbose")){
98 				verbose=Parse.parseBoolean(b);
99 			}else if(a.equals("ordered")){
100 				ordered=Parse.parseBoolean(b);
101 			}else if(a.equals("shuffle")){
102 				shuffle=Parse.parseBoolean(b);
103 			}else if(a.equals("size") || a.equals("heapsize")){
104 				heapSize_=Parse.parseIntKMG(b);
105 				setHeapSize=true;
106 			}else if(a.equals("kmers") || a.equals("target") || a.equals("limit")){
107 				targetKmers_=Parse.parseKMG(b);
108 			}else if(a.equals("mincount")){
109 				minCount_=Parse.parseIntKMG(b);
110 			}else if(parseSketchFlags(arg, a, b)){
111 				parser.parse(arg, a, b);
112 			}else if(defaultParams.parse(arg, a, b)){
113 				parser.parse(arg, a, b);
114 			}else if(a.equals("parse_flag_goes_here")){
115 				long fake_variable=Parse.parseKMG(b);
116 				//Set a variable here
117 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
118 				//do nothing
119 			}else{
120 				outstream.println("Unknown parameter "+args[i]);
121 				assert(false) : "Unknown parameter "+args[i];
122 			}
123 		}
124 
125 		if(!setHeapSize && minCount_>1){heapSize_=32000;}
126 		heapSize=heapSize_;
127 		targetKmers=targetKmers_;
128 		k=k_;
129 		minCount=minCount_;
130 		assert(targetKmers>0) : "Must set a kmer limit.";
131 		assert(heapSize>0) : "Heap size must be positive.";
132 		assert(k>0 && k<=32) : "0<k<33; k="+k;
133 		postParse();
134 
135 		if(minCount>1){
136 			Shared.setBufferLen(800);
137 		}
138 
139 		{//Process parser fields
140 			Parser.processQuality();
141 
142 			maxReads=parser.maxReads;
143 
144 			overwrite=ReadStats.overwrite=parser.overwrite;
145 			append=ReadStats.append=parser.append;
146 			setInterleaved=parser.setInterleaved;
147 
148 			in1=parser.in1;
149 			in2=parser.in2;
150 			qfin1=parser.qfin1;
151 			qfin2=parser.qfin2;
152 
153 			out1=parser.out1;
154 			out2=parser.out2;
155 			qfout1=parser.qfout1;
156 			qfout2=parser.qfout2;
157 
158 			extin=parser.extin;
159 			extout=parser.extout;
160 		}
161 
162 		//Do input file # replacement
163 		if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
164 			in2=in1.replace("#", "2");
165 			in1=in1.replace("#", "1");
166 		}
167 
168 		//Do output file # replacement
169 		if(out1!=null && out2==null && out1.indexOf('#')>-1){
170 			out2=out1.replace("#", "2");
171 			out1=out1.replace("#", "1");
172 		}
173 
174 		//Adjust interleaved detection based on the number of input files
175 		if(in2!=null){
176 			if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
177 			FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
178 		}
179 
180 		assert(FastaReadInputStream.settingsOK());
181 
182 		//Ensure there is an input file
183 		if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
184 
185 		//Adjust the number of threads for input file reading
186 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
187 			ByteFile.FORCE_MODE_BF2=true;
188 		}
189 
190 		//Ensure out2 is not set without out1
191 		if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
192 
193 		//Adjust interleaved settings based on number of output files
194 		if(!setInterleaved){
195 			assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
196 			if(in2!=null){ //If there are 2 input streams.
197 				FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
198 				outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
199 			}else{ //There is one input stream.
200 				if(out2!=null){
201 					FASTQ.FORCE_INTERLEAVED=true;
202 					FASTQ.TEST_INTERLEAVED=false;
203 					outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
204 				}
205 			}
206 		}
207 
208 		//Ensure output files can be written
209 		if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
210 			outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
211 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
212 		}
213 
214 		//Ensure input files can be read
215 		if(!Tools.testInputFiles(false, true, in1, in2)){
216 			throw new RuntimeException("\nCan't read some input files.\n");
217 		}
218 
219 		//Ensure that no file was specified multiple times
220 		if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
221 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
222 		}
223 
224 		//Create output FileFormat objects
225 		ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
226 		ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
227 
228 		//Create input FileFormat objects
229 		ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
230 		ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
231 
232 		minProb=defaultParams.minProb;
233 		minQual=defaultParams.minQual;
234 
235 		shift=2*k;
236 		shift2=shift-2;
237 		mask=(shift>63 ? -1L : ~((-1L)<<shift)); //Conditional allows K=32
238 		sharedHeap=new SketchHeap(heapSize, 0, minCount>1);
239 	}
240 
241 	/*--------------------------------------------------------------*/
242 	/*----------------         Outer Methods        ----------------*/
243 	/*--------------------------------------------------------------*/
244 
245 	/** Create read streams and process all data */
process(Timer t)246 	void process(Timer t){
247 
248 		//Turn off read validation in the input threads to increase speed
249 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
250 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
251 
252 		//Create a read input stream
253 		final ConcurrentReadInputStream cris;
254 		{
255 			cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
256 			cris.start(); //Start the stream
257 			if(verbose){outstream.println("Started cris");}
258 		}
259 		boolean paired=cris.paired();
260 		if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
261 
262 		//Optionally create a read output stream
263 		final ConcurrentReadOutputStream ros;
264 		if(ffout1!=null){
265 			//Select output buffer size based on whether it needs to be ordered
266 			final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
267 
268 			//Notify user of output mode
269 			if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
270 				outstream.println("Writing interleaved.");
271 			}
272 
273 			ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
274 			ros.start(); //Start the stream
275 		}else{ros=null;}
276 
277 		//Reset counters
278 		readsProcessed=readsOut=0;
279 		basesProcessed=basesOut=0;
280 
281 		//Process the reads in separate threads
282 		spawnThreads(cris, ros);
283 
284 		if(verbose){outstream.println("Finished; closing streams.");}
285 
286 		//Write anything that was accumulated by ReadStats
287 		errorState|=ReadStats.writeAll();
288 		//Close the read streams
289 		errorState|=ReadWrite.closeStreams(cris, ros);
290 
291 		//Reset read validation
292 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
293 
294 		//Report timing and results
295 		t.stop();
296 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
297 		outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
298 		String kstring=Tools.padKM(sharedHeap.genomeSizeEstimate(minCount), 8);
299 		outstream.println("Unique Kmers Out:   "+kstring);
300 
301 //		Sketch sk=new Sketch(sharedHeap, true, true, null);
302 //		outstream.println(sk.genomeSizeEstimate());
303 
304 		//Throw an exception of there was an error in a thread
305 		if(errorState){
306 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
307 		}
308 	}
309 
310 	/** Spawn process threads */
311 	private void spawnThreads(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
312 
313 		//Do anything necessary prior to processing
314 
315 		//Determine how many threads may be used
316 		final int threads=Tools.min(8, Shared.threads());
317 
318 		//Fill a list with ProcessThreads
319 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
320 		int tSize=heapSize/2;
321 		for(int i=0; i<threads; i++){
322 			alpt.add(new ProcessThread(cris, ros, i, tSize));
323 		}
324 
325 		//Start the threads
326 		for(ProcessThread pt : alpt){
327 			pt.start();
328 		}
329 
330 		//Wait for completion of all threads
331 		boolean success=true;
332 		for(ProcessThread pt : alpt){
333 
334 			//Wait until this thread has terminated
335 			while(pt.getState()!=Thread.State.TERMINATED){
336 				try {
337 					//Attempt a join operation
338 					pt.join();
339 				} catch (InterruptedException e) {
340 					//Potentially handle this, if it is expected to occur
341 					e.printStackTrace();
342 				}
343 			}
344 
345 			//Accumulate per-thread statistics
346 			readsProcessed+=pt.readsProcessedT;
347 			basesProcessed+=pt.basesProcessedT;
348 			readsOut+=pt.readsOutT;
349 			basesOut+=pt.basesOutT;
350 			success&=pt.success;
351 		}
352 
353 		//Track whether any threads failed
354 		if(!success){errorState=true;}
355 
356 		//Do anything necessary after processing
357 
358 	}
359 
360 	/*--------------------------------------------------------------*/
361 	/*----------------         Inner Methods        ----------------*/
362 	/*--------------------------------------------------------------*/
363 
364 	/*--------------------------------------------------------------*/
365 	/*----------------         Inner Classes        ----------------*/
366 	/*--------------------------------------------------------------*/
367 
368 	/** This class is static to prevent accidental writing to shared variables.
369 	 * It is safe to remove the static modifier. */
370 	private class ProcessThread extends Thread {
371 
372 		//Constructor
373 		ProcessThread(final ConcurrentReadInputStream cris_, final ConcurrentReadOutputStream ros_, final int tid_, final int size){
374 			cris=cris_;
375 			ros=ros_;
376 			tid=tid_;
377 			localHeap=new SketchHeap(size, 0, minCount>1);
378 		}
379 
380 		//Called by start()
381 		@Override
382 		public void run(){
383 			//Do anything necessary prior to processing
384 
385 			//Process the reads
386 			processInner();
387 
388 			//Do anything necessary after processing
389 
390 			//Indicate successful exit status
391 			success=true;
392 		}
393 
394 		/** Iterate through the reads */
395 		void processInner(){
396 
397 			//Grab the first ListNum of reads
398 			ListNum<Read> ln=cris.nextList();
399 			//Grab the actual read list from the ListNum
400 			ArrayList<Read> reads=(ln!=null ? ln.list : null);
401 
402 			//Check to ensure pairing is as expected
403 			if(reads!=null && !reads.isEmpty()){
404 				Read r=reads.get(0);
405 //				assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
406 			}
407 
408 			//As long as there is a nonempty read list...
409 			while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
410 //				if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
411 
412 				//Loop through each read in the list
413 				for(int idx=0; idx<reads.size(); idx++){
414 					final Read r1=reads.get(idx);
415 					final Read r2=r1.mate;
416 
417 					//Validate reads in worker threads
418 					if(!r1.validated()){r1.validate(true);}
419 					if(r2!=null && !r2.validated()){r2.validate(true);}
420 
421 					//Track the initial length for statistics
422 					final int initialLength1=r1.length();
423 					final int initialLength2=r1.mateLength();
424 
425 					//Increment counters
426 					readsProcessedT+=r1.pairCount();
427 					basesProcessedT+=initialLength1+initialLength2;
428 
429 					//Reads are processed in this block.
430 					processReadPair(r1, r2);
431 				}
432 
433 				long count=dumpHeap();
434 				if(count>=targetKmers){break;}
435 
436 				for(Read r1 : reads){
437 					readsOutT+=r1.pairCount();
438 					basesOutT+=r1.pairLength();
439 				}
440 
441 				//Output reads to the output stream
442 				if(ros!=null){ros.add(reads, ln.id);}
443 
444 				//Notify the input stream that the list was used
445 				cris.returnList(ln);
446 //				if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
447 
448 				//Fetch a new list
449 				ln=cris.nextList();
450 				reads=(ln!=null ? ln.list : null);
451 			}
452 
453 			//Notify the input stream that the final list was used
454 			if(ln!=null){
455 				if(ln.list!=null){ln.list.clear();}
456 				cris.returnList(ln.id, true);
457 			}
458 		}
459 
460 		/**
461 		 * Process a read or a read pair.
462 		 * @param r1 Read 1
463 		 * @param r2 Read 2 (may be null)
464 		 */
465 		void processReadPair(final Read r1, final Read r2){
466 			processReadNucleotide(r1);
467 			if(r2!=null){processReadNucleotide(r2);}
468 		}
469 
470 		void processReadNucleotide(final Read r){
471 			final byte[] bases=r.bases;
472 			final byte[] quals=r.quality;
473 			long kmer=0;
474 			long rkmer=0;
475 			int len=0;
476 			assert(!r.aminoacid());
477 
478 			final long min=minHashValue;
479 			localHeap.genomeSizeBases+=r.length();
480 			localHeap.genomeSequences++;
481 
482 			if(quals==null || (minProb<=0 && minQual<2)){
483 				for(int i=0; i<bases.length; i++){
484 					byte b=bases[i];
485 					long x=AminoAcid.baseToNumber[b];
486 					long x2=AminoAcid.baseToComplementNumber[b];
487 
488 					kmer=((kmer<<2)|x)&mask;
489 					rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
490 
491 					if(x<0){len=0; rkmer=0;}else{len++;}
492 					if(len>=k){
493 						localHeap.genomeSizeKmers++;
494 						final long hashcode=hash(kmer, rkmer);
495 						if(hashcode>min){localHeap.add(hashcode);}
496 					}
497 				}
498 			}else{
499 				float prob=1;
500 				for(int i=0; i<bases.length; i++){
501 					final byte b=bases[i];
502 					final long x=AminoAcid.baseToNumber[b];
503 					final long x2=AminoAcid.baseToComplementNumber[b];
504 
505 					kmer=((kmer<<2)|x)&mask;
506 					rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
507 
508 					{//Quality-related stuff
509 						final byte q=quals[i];
510 						assert(q>=0) : Arrays.toString(quals)+"\n"+minProb+", "+minQual;
511 						prob=prob*align2.QualityTools.PROB_CORRECT[q];
512 						if(len>k){
513 							byte oldq=quals[i-k];
514 							prob=prob*align2.QualityTools.PROB_CORRECT_INVERSE[oldq];
515 						}
516 						if(x<0 || q<minQual){
517 							len=0;
518 							kmer=rkmer=0;
519 							prob=1;
520 						}else{
521 							len++;
522 						}
523 					}
524 
525 					if(len>=k && prob>=minProb){
526 						localHeap.genomeSizeKmers++;
527 						localHeap.probSum+=prob;
528 						final long hashcode=hash(kmer, rkmer);
529 						if(hashcode>min){localHeap.checkAndAdd(hashcode);}
530 					}
531 				}
532 			}
533 		}
534 
535 		private long dumpHeap(){
536 			long count=0;
537 			synchronized(sharedHeap){
538 				count=sharedHeap.genomeSizeEstimate(minCount);
539 				if(count<targetKmers){
540 					sharedHeap.add(localHeap);
541 					localHeap.clear();
542 				}
543 			}
544 			return count;
545 		}
546 
547 		/** Number of reads processed by this thread */
548 		protected long readsProcessedT=0;
549 		/** Number of bases processed by this thread */
550 		protected long basesProcessedT=0;
551 
552 		/** Number of reads retained by this thread */
553 		protected long readsOutT=0;
554 		/** Number of bases retained by this thread */
555 		protected long basesOutT=0;
556 
557 		/** True only if this thread has completed successfully */
558 		boolean success=false;
559 
560 		/** Shared input stream */
561 		private final ConcurrentReadInputStream cris;
562 		/** Shared output stream */
563 		private final ConcurrentReadOutputStream ros;
564 		/** Thread ID */
565 		final int tid;
566 
567 		final SketchHeap localHeap;
568 	}
569 
570 	/*--------------------------------------------------------------*/
571 	/*----------------            Fields            ----------------*/
572 	/*--------------------------------------------------------------*/
573 
574 	/** Primary input file path */
575 	private String in1=null;
576 	/** Secondary input file path */
577 	private String in2=null;
578 
579 	private String qfin1=null;
580 	private String qfin2=null;
581 
582 	/** Primary output file path */
583 	private String out1=null;
584 	/** Secondary output file path */
585 	private String out2=null;
586 
587 	private String qfout1=null;
588 	private String qfout2=null;
589 
590 	/** Override input file extension */
591 	private String extin=null;
592 	/** Override output file extension */
593 	private String extout=null;
594 
595 	/*--------------------------------------------------------------*/
596 
597 	/** Number of reads processed */
598 	protected long readsProcessed=0;
599 	/** Number of bases processed */
600 	protected long basesProcessed=0;
601 
602 	/** Number of reads retained */
603 	protected long readsOut=0;
604 	/** Number of bases retained */
605 	protected long basesOut=0;
606 
607 	/** Quit after processing this many input reads; -1 means no limit */
608 	private long maxReads=-1;
609 
610 	/*--------------------------------------------------------------*/
611 	/*----------------         Final Fields         ----------------*/
612 	/*--------------------------------------------------------------*/
613 
614 	/** Primary input file */
615 	private final FileFormat ffin1;
616 	/** Secondary input file */
617 	private final FileFormat ffin2;
618 
619 	/** Primary output file */
620 	private final FileFormat ffout1;
621 	/** Secondary output file */
622 	private final FileFormat ffout2;
623 
624 	private final SketchHeap sharedHeap;
625 	private final int heapSize;
626 	private final long targetKmers;
627 	private final int minCount;
628 
629 	final int shift;
630 	final int shift2;
631 	final long mask;
632 
633 	final float minProb;
634 	final byte minQual;
635 
636 	/*--------------------------------------------------------------*/
637 	/*----------------        Common Fields         ----------------*/
638 	/*--------------------------------------------------------------*/
639 
640 	/** Print status messages to this output stream */
641 	private PrintStream outstream=System.err;
642 	/** Print verbose messages */
643 	public static boolean verbose=false;
644 	/** True if an error was encountered */
645 	public boolean errorState=false;
646 	/** Overwrite existing output files */
647 	private boolean overwrite=true;
648 	/** Append to existing output files */
649 	private boolean append=false;
650 	/** Reads are output in input order (not enabled) */
651 	private boolean ordered=false;
652 	/** Shuffle input */
653 	private boolean shuffle=false;
654 
655 }
656