1 package var2;
2 
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 import java.util.Locale;
6 
7 import fileIO.ByteFile;
8 import fileIO.FileFormat;
9 import fileIO.ReadWrite;
10 import shared.Parse;
11 import shared.Parser;
12 import shared.PreParser;
13 import shared.ReadStats;
14 import shared.Shared;
15 import shared.Timer;
16 import shared.Tools;
17 import stream.ConcurrentReadInputStream;
18 import stream.ConcurrentReadOutputStream;
19 import stream.FastaReadInputStream;
20 import stream.Read;
21 import stream.SamLine;
22 import stream.SamReadStreamer;
23 import stream.SamStreamer;
24 import structures.ListNum;
25 
26 /**
27  * Removes lines with unsupported variations from a sam file.
28  *
29  * @author Brian Bushnell
30  * @date January 25, 2018
31  *
32  */
33 public class FilterSam {
34 
35 	/*--------------------------------------------------------------*/
36 	/*----------------        Initialization        ----------------*/
37 	/*--------------------------------------------------------------*/
38 
39 	/**
40 	 * Code entrance from the command line.
41 	 * @param args Command line arguments
42 	 */
main(String[] args)43 	public static void main(String[] args){
44 		//Start a timer immediately upon code entrance.
45 		Timer t=new Timer();
46 
47 		//Create an instance of this class
48 		FilterSam x=new FilterSam(args);
49 
50 		//Run the object
51 		x.process(t);
52 
53 		//Close the print stream if it was redirected
54 		Shared.closeStream(x.outstream);
55 	}
56 
57 	/**
58 	 * Constructor.
59 	 * @param args Command line arguments
60 	 */
FilterSam(String[] args)61 	public FilterSam(String[] args){
62 
63 		{//Preparse block for help, config files, and outstream
64 			PreParser pp=new PreParser(args, getClass(), false);
65 			args=pp.args;
66 			outstream=pp.outstream;
67 		}
68 
69 		//Set shared static variables
70 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
71 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
72 		SamLine.SET_FROM_OK=true;
73 		Var.CALL_INS=false;
74 		Var.CALL_DEL=false;
75 
76 		//Create a parser object
77 		Parser parser=new Parser();
78 
79 		//Parse each argument
80 		for(int i=0; i<args.length; i++){
81 			String arg=args[i];
82 
83 			//Break arguments into their constituent parts, in the form of "a=b"
84 			String[] split=arg.split("=");
85 			String a=split[0].toLowerCase();
86 			String b=split.length>1 ? split[1] : null;
87 
88 			if(a.equals("verbose")){
89 				verbose=Parse.parseBoolean(b);
90 			}else if(a.equals("ordered")){
91 				ordered=Parse.parseBoolean(b);
92 			}else if(a.equals("ss") || a.equals("streamer")){
93 				useStreamer=Parse.parseBoolean(b);
94 			}else if(a.equals("ploidy")){
95 				ploidy=Integer.parseInt(b);
96 			}else if(a.equals("prefilter")){
97 				prefilter=Parse.parseBoolean(b);
98 			}else if(a.equals("ref")){
99 				ref=b;
100 			}else if(a.equals("outbad") || a.equals("outb")){
101 				outBad=b;
102 			}else if(a.equals("vars") || a.equals("variants") || a.equals("varfile") || a.equals("inv")){
103 				varFile=b;
104 			}else if(a.equals("vcf") || a.equals("vcffile")){
105 				vcfFile=b;
106 			}else if(a.equals("maxsubs") || a.equals("subfilter")){
107 				maxSubs=Integer.parseInt(b);
108 			}else if(a.equals("maxvars") || a.equals("varfilter")){
109 				maxVars=Integer.parseInt(b);
110 			}else if(a.equals("maxbadsubs") || a.equals("maxbadvars") || a.equals("mbv")){
111 				maxBadVars=Integer.parseInt(b);
112 			}else if(a.equals("maxbadsubdepth") || a.equals("maxbadvardepth") || a.equals("maxbadsuballeledepth")
113 					|| a.equals("maxbadvaralleledepth") || a.equals("mbsad") || a.equals("mbvad") || a.equals("mbad")){
114 				maxBadAlleleDepth=Integer.parseInt(b);
115 			}else if(a.equals("maxbadallelefraction") || a.equals("mbaf")){
116 				maxBadAlleleFraction=Float.parseFloat(b);
117 			}else if(a.equals("minbadsubreaddepth") || a.equals("minbadreaddepth") || a.equals("mbsrd") || a.equals("mbrd")){
118 				minBadReadDepth=Integer.parseInt(b);
119 			}else if(a.equals("sub") || a.equals("subs")){
120 				Var.CALL_SUB=Parse.parseBoolean(b);
121 			}else if(a.equals("ins") || a.equals("inss")){
122 				Var.CALL_INS=Parse.parseBoolean(b);
123 			}else if(a.equals("del") || a.equals("dels")){
124 				Var.CALL_DEL=Parse.parseBoolean(b);
125 			}else if(a.equals("indel") || a.equals("indels")){
126 				Var.CALL_INS=Var.CALL_DEL=Parse.parseBoolean(b);
127 			}else if(a.equals("minedist") || a.equals("minenddist") || a.equals("border")){
128 				minEDist=Integer.parseInt(b);
129 			}else if(a.equals("parse_flag_goes_here")){
130 				long fake_variable=Parse.parseKMG(b);
131 				//Set a variable here
132 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
133 				//do nothing
134 			}else{
135 				outstream.println("Unknown parameter "+args[i]);
136 				assert(false) : "Unknown parameter "+args[i];
137 			}
138 		}
139 
140 		{//Process parser fields
141 			Parser.processQuality();
142 
143 			maxReads=parser.maxReads;
144 
145 			overwrite=ReadStats.overwrite=parser.overwrite;
146 			append=ReadStats.append=parser.append;
147 
148 			in1=parser.in1;
149 
150 			outGood=parser.out1;
151 		}
152 
153 		assert(FastaReadInputStream.settingsOK());
154 
155 		//Ensure there is an input file
156 //		if(in1==null){throw new RuntimeException("Error - an input file and a VCF file are required.");}
157 		if(in1==null){throw new RuntimeException("Error - an input file is required.");}
158 
159 		//Adjust the number of threads for input file reading
160 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
161 			ByteFile.FORCE_MODE_BF2=true;
162 		}
163 
164 		//Ensure output files can be written
165 		if(!Tools.testOutputFiles(overwrite, append, false, outGood, outBad)){
166 			outstream.println((outGood==null)+", "+(outBad==null));
167 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+outGood+", "+outBad+"\n");
168 		}
169 
170 		//Ensure input files can be read
171 		if(!Tools.testInputFiles(false, true, in1, vcfFile, varFile)){
172 			throw new RuntimeException("\nCan't read some input files.\n");
173 		}
174 
175 		//Ensure that no file was specified multiple times
176 		if(!Tools.testForDuplicateFiles(true, in1, vcfFile, varFile, outBad, outGood)){
177 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
178 		}
179 
180 		//Create output FileFormat objects
181 		ffoutGood=FileFormat.testOutput(outGood, FileFormat.SAM, null, true, overwrite, append, ordered);
182 		ffoutBad=FileFormat.testOutput(outBad, FileFormat.SAM, null, true, overwrite, append, ordered);
183 
184 		//Create input FileFormat objects
185 		ffin1=FileFormat.testInput(in1, FileFormat.SAM, null, true, true);
186 
187 		Timer t=new Timer(outstream, true);
188 		{
189 			t.start();
190 			outstream.print("Loading scaffolds:  ");
191 			if(ref==null){
192 				scafMap=ScafMap.loadSamHeader(in1);
193 			}else{
194 				scafMap=ScafMap.loadReference(ref, true);
195 			}
196 			assert(scafMap!=null && scafMap.size()>0) : "No scaffold names were loaded.";
197 			t.stop(pad(scafMap.size(), 12)+" \t");
198 		}
199 		t.start();
200 		if(varFile!=null){
201 			outstream.print("Loading vars:       ");
202 			varMap=VcfLoader.loadVarFile(varFile, scafMap);
203 			t.stop(pad(varMap.size(), 12)+" \t");
204 		}else if(vcfFile!=null){t.start();
205 			outstream.print("Loading vcf:        ");
206 			varMap=VcfLoader.loadVcfFile(vcfFile, scafMap, true, false);
207 			t.stop(pad(varMap.size(), 12)+" \t");
208 		}else{
209 			outstream.print("Calling variants:\n");
210 			String inString="in="+in1;
211 
212 //			private int maxBadAlleleDepth=2;
213 //			/** Maximum allele fraction for variants considered unsupported */
214 //			private float maxBadAlleleFraction=0.01f;
215 
216 			String[] cvargs=new String[] {inString, "ref="+ref, "clearfilters", "minreads="+(maxBadAlleleDepth),
217 					"minallelefraction="+maxBadAlleleFraction, "printexecuting=f"};
218 			CallVariants cv=new CallVariants(cvargs);
219 			cv.prefilter=prefilter;
220 			cv.ploidy=ploidy;
221 
222 			varMap=cv.process(new Timer());
223 			scafMap=cv.scafMap;
224 			t.stop(/*pad(varMap.size(), 12)+" \t"*/);
225 		}
226 
227 		assert(Var.CALL_INS || Var.CALL_DEL || Var.CALL_SUB) : "Must enable at least one of subs, insertions, or deletions.";
228 		subsOnly=!Var.CALL_INS && !Var.CALL_DEL;
229 	}
230 
231 	/*--------------------------------------------------------------*/
232 	/*----------------         Outer Methods        ----------------*/
233 	/*--------------------------------------------------------------*/
234 
235 	/** Create read streams and process all data */
process(Timer t)236 	void process(Timer t){
237 
238 		//Turn off read validation in the input threads to increase speed
239 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
240 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
241 
242 
243 		final SamReadStreamer ss;
244 		//Create a read input stream
245 		final ConcurrentReadInputStream cris;
246 		if(useStreamer){
247 			cris=null;
248 			ss=new SamReadStreamer(ffin1, streamerThreads, true, maxReads);
249 			ss.start();
250 			if(verbose){outstream.println("Started streamer");}
251 		}else{
252 			ss=null;
253 			cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, null);
254 			cris.start(); //Start the stream
255 			if(verbose){outstream.println("Started cris");}
256 		}
257 
258 		//Optionally create a read output stream
259 		final ConcurrentReadOutputStream ros, rosb;
260 		if(ffoutGood!=null){
261 			//Select output buffer size based on whether it needs to be ordered
262 			final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
263 
264 			ros=ConcurrentReadOutputStream.getStream(ffoutGood, null, null, null, buff, null, true);
265 			ros.start(); //Start the stream
266 		}else{ros=null;}
267 		if(ffoutBad!=null){
268 			//Select output buffer size based on whether it needs to be ordered
269 			final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
270 
271 			rosb=ConcurrentReadOutputStream.getStream(ffoutBad, null, null, null, buff, null, false);
272 			rosb.start(); //Start the stream
273 		}else{rosb=null;}
274 
275 		//Reset counters
276 		readsProcessed=0;
277 		basesProcessed=0;
278 
279 		//Process the reads in separate threads
280 		spawnThreads(cris, ss, ros, rosb);
281 
282 		if(verbose){outstream.println("Finished; closing streams.");}
283 
284 		//Write anything that was accumulated by ReadStats
285 		errorState|=ReadStats.writeAll();
286 		//Close the read streams
287 		errorState|=ReadWrite.closeStreams(cris, ros, rosb);
288 
289 		//Reset read validation
290 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
291 
292 		//Report timing and results
293 		{
294 			t.stop();
295 
296 			//Calculate units per nanosecond
297 			double rpnano=readsProcessed/(double)(t.elapsed);
298 			double bpnano=basesProcessed/(double)(t.elapsed);
299 
300 			final long rg=readsOut, rb=readsProcessed-readsOut;
301 			final long bg=basesOut, bb=basesProcessed-basesOut;
302 
303 			//Add "k" and "m" for large numbers
304 //			String rpstring=(readsProcessed<100000 ? ""+readsProcessed : readsProcessed<100000000 ? (readsProcessed/1000)+"k" : (readsProcessed/1000000)+"m");
305 //			String bpstring=(basesProcessed<100000 ? ""+basesProcessed : basesProcessed<100000000 ? (basesProcessed/1000)+"k" : (basesProcessed/1000000)+"m");
306 //			String rgstring=(rg<100000 ? ""+rg : rg<100000000 ? (rg/1000)+"k" : (rg/1000000)+"m");
307 //			String bgstring=(bg<100000 ? ""+bg : bg<100000000 ? (bg/1000)+"k" : (bg/1000000)+"m");
308 //			String rbstring=(rb<100000 ? ""+rb : rb<100000000 ? (rb/1000)+"k" : (rb/1000000)+"m");
309 //			String bbstring=(bb<100000 ? ""+bb : bb<100000000 ? (bb/1000)+"k" : (bb/1000000)+"m");
310 
311 			final int len=12;
312 
313 			final String rpstring=pad(readsProcessed, len);
314 			final String bpstring=pad(basesProcessed, len);
315 			final String rgstring=pad(rg, len);
316 			final String bgstring=pad(bg, len);
317 			final String rbstring=pad(rb, len);
318 			final String bbstring=pad(bb, len);
319 
320 			final double mappedReadsDiscarded=mappedReadsProcessed-mappedReadsRetained;
321 			double avgQGood=(bqSumGood/(double)mappedReadsRetained);
322 			double avgQBad=(bqSumBad/(double)mappedReadsDiscarded);
323 			double avgMapQGood=(mapqSumGood/(double)mappedReadsRetained);
324 			double avgMapQBad=(mapqSumBad/(double)mappedReadsDiscarded);
325 			double avgVarsGood=(varSumGood/(double)mappedReadsRetained);
326 			double avgVarsBad=(varSumBad/(double)mappedReadsDiscarded);
327 			final String avgQGoodS=pad(String.format(Locale.ROOT, "%.2f", avgQGood), len);
328 			final String avgQBadS=pad(String.format(Locale.ROOT, "%.2f", avgQBad), len);
329 			final String avgMapQGoodS=pad(String.format(Locale.ROOT, "%.2f", avgMapQGood), len);
330 			final String avgMapQBadS=pad(String.format(Locale.ROOT, "%.2f", avgMapQBad), len);
331 			final String avgVarsGoodS=pad(String.format(Locale.ROOT, "%.2f", avgVarsGood), len);
332 			final String avgVarsBadS=pad(String.format(Locale.ROOT, "%.2f", avgVarsBad), len);
333 
334 			outstream.println("Time:                         \t\t"+t);
335 			outstream.println("Reads Processed:    "+rpstring+" \t"+String.format(Locale.ROOT, "%.2fk reads/sec", rpnano*1000000));
336 			outstream.println("Bases Processed:    "+bpstring+" \t"+String.format(Locale.ROOT, "%.2fm bases/sec", bpnano*1000));
337 			outstream.println();
338 			outstream.println("Reads Retained:     "+rgstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (rg*100.0/readsProcessed)));
339 			outstream.println("Bases Retained:     "+bgstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (bg*100.0/basesProcessed)));
340 			outstream.println("Avg. Qual Retained: "+avgQGoodS);
341 			outstream.println("Avg. MapQ Retained: "+avgMapQGoodS);
342 			outstream.println("Avg. Vars Retained: "+avgVarsGoodS);
343 			outstream.println();
344 			outstream.println("Reads Discarded:    "+rbstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (rb*100.0/readsProcessed)));
345 			outstream.println("Bases Discarded:    "+bbstring+" \t"+String.format(Locale.ROOT, "%.2f%%", (bb*100.0/basesProcessed)));
346 			outstream.println("Avg. Qual Discarded:"+avgQBadS);
347 			outstream.println("Avg. MapQ Discarded:"+avgMapQBadS);
348 			outstream.println("Avg. Vars Discarded:"+avgVarsBadS);
349 		}
350 
351 		//Throw an exception of there was an error in a thread
352 		if(errorState){
353 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
354 		}
355 	}
356 
357 	private static String pad(long s, int len){
358 		return pad(""+s, len);
359 	}
360 
361 	private static String pad(String s, int len){
362 		while(s.length()<len){s=" "+s;}
363 		return s;
364 	}
365 
366 	/** Spawn process threads */
367 	private void spawnThreads(final ConcurrentReadInputStream cris, final SamStreamer ss, final ConcurrentReadOutputStream ros, final ConcurrentReadOutputStream rosb){
368 
369 		//Do anything necessary prior to processing
370 
371 		//Determine how many threads may be used
372 		final int threads=Shared.threads();
373 
374 		//Fill a list with ProcessThreads
375 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
376 		for(int i=0; i<threads; i++){
377 			alpt.add(new ProcessThread(cris, ss, ros, rosb, i));
378 		}
379 
380 		//Start the threads
381 		for(ProcessThread pt : alpt){
382 			pt.start();
383 		}
384 
385 		//Wait for completion of all threads
386 		boolean success=true;
387 		for(ProcessThread pt : alpt){
388 
389 			//Wait until this thread has terminated
390 			while(pt.getState()!=Thread.State.TERMINATED){
391 				try {
392 					//Attempt a join operation
393 					pt.join();
394 				} catch (InterruptedException e) {
395 					//Potentially handle this, if it is expected to occur
396 					e.printStackTrace();
397 				}
398 			}
399 
400 			//Accumulate per-thread statistics
401 			readsProcessed+=pt.readsProcessedT;
402 			basesProcessed+=pt.basesProcessedT;
403 			mappedReadsProcessed+=pt.mappedReadsProcessedT;
404 			mappedBasesProcessed+=pt.mappedBasesProcessedT;
405 			mappedReadsRetained+=pt.mappedReadsRetainedT;
406 			mappedBasesRetained+=pt.mappedBasesRetainedT;
407 			readsOut+=pt.readsOutT;
408 			basesOut+=pt.basesOutT;
409 			bqSumGood+=pt.qSumGoodT;
410 			bqSumBad+=pt.qSumBadT;
411 //			adSumGood+=pt.adSumGoodT;
412 //			adSumBad+=pt.adSumBadT;
413 //			rdSumGood+=pt.rdSumGoodT;
414 //			rdSumBad+=pt.rdSumBadT;
415 			varSumGood+=pt.varSumGoodT;
416 			varSumBad+=pt.varSumBadT;
417 			mapqSumGood+=pt.mapqSumGoodT;
418 			mapqSumBad+=pt.mapqSumBadT;
419 			success&=pt.success;
420 		}
421 
422 		//Track whether any threads failed
423 		if(!success){errorState=true;}
424 
425 		//Do anything necessary after processing
426 
427 	}
428 
429 	/*--------------------------------------------------------------*/
430 	/*----------------         Inner Methods        ----------------*/
431 	/*--------------------------------------------------------------*/
432 
433 	/*--------------------------------------------------------------*/
434 	/*----------------         Inner Classes        ----------------*/
435 	/*--------------------------------------------------------------*/
436 
437 	/** This class is static to prevent accidental writing to shared variables.
438 	 * It is safe to remove the static modifier. */
439 	private class ProcessThread extends Thread {
440 
441 		//Constructor
442 		ProcessThread(final ConcurrentReadInputStream cris_, final SamStreamer ss_, final ConcurrentReadOutputStream ros_, final ConcurrentReadOutputStream rosb_, final int tid_){
443 			cris=cris_;
444 			ss=ss_;
445 			ros=ros_;
446 			rosb=rosb_;
447 			tid=tid_;
448 		}
449 
450 		//Called by start()
451 		@Override
452 		public void run(){
453 			//Do anything necessary prior to processing
454 
455 			//Process the reads
456 			if(useStreamer){
457 				processSS();
458 			}else{
459 				processCris();
460 			}
461 
462 			//Do anything necessary after processing
463 
464 			//Indicate successful exit status
465 			success=true;
466 		}
467 
468 		/** Iterate through the reads */
469 		void processCris(){
470 
471 			//Grab the first ListNum of reads
472 			ListNum<Read> ln=cris.nextList();
473 			//Grab the actual read list from the ListNum
474 			ArrayList<Read> reads=(ln!=null ? ln.list : null);
475 
476 			//Check to ensure pairing is as expected
477 			if(reads!=null && !reads.isEmpty()){
478 				Read r=reads.get(0);
479 //				assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
480 			}
481 
482 			//As long as there is a nonempty read list...
483 			while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
484 //				if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
485 
486 				ArrayList<Read> good=new ArrayList<Read>(reads.size());
487 				ArrayList<Read> bad=new ArrayList<Read>(Tools.max(1, reads.size()/4));
488 
489 				//Loop through each read in the list
490 				for(int idx=0; idx<reads.size(); idx++){
491 					final Read r1=reads.get(idx);
492 
493 					//Validate reads in worker threads
494 					if(!r1.validated()){r1.validate(true);}
495 
496 					//Track the initial length for statistics
497 					final int initialLength1=r1.length();
498 
499 					//Increment counters
500 					readsProcessedT++;
501 					basesProcessedT+=initialLength1;
502 
503 					{
504 						//Reads are processed in this block.
505 						boolean keep=processRead(r1);
506 						if(keep){
507 							//Increment counters
508 							readsOutT++;
509 							basesOutT+=initialLength1;
510 							good.add(r1);
511 						}else{
512 							bad.add(r1);
513 						}
514 					}
515 				}
516 
517 				//Output reads to the output stream
518 				if(ros!=null){ros.add(good, ln.id);}
519 				if(rosb!=null){rosb.add(bad, ln.id);}
520 
521 				//Notify the input stream that the list was used
522 				cris.returnList(ln);
523 //				if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
524 
525 				//Fetch a new list
526 				ln=cris.nextList();
527 				reads=(ln!=null ? ln.list : null);
528 			}
529 
530 			//Notify the input stream that the final list was used
531 			if(ln!=null){
532 				cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
533 			}
534 		}
535 
536 		/** Iterate through the reads */
537 		void processSS(){
538 
539 			//Grab the first ListNum of reads
540 			ListNum<Read> ln=ss.nextList();
541 			//Grab the actual read list from the ListNum
542 			ArrayList<Read> reads=(ln!=null ? ln.list : null);
543 
544 			//Check to ensure pairing is as expected
545 			if(reads!=null && !reads.isEmpty()){
546 				Read r=reads.get(0);
547 //				assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); //Disabled due to non-static access
548 			}
549 
550 			//As long as there is a nonempty read list...
551 			while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
552 //				if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
553 
554 				ArrayList<Read> good=new ArrayList<Read>(reads.size());
555 				ArrayList<Read> bad=new ArrayList<Read>(Tools.max(1, reads.size()/4));
556 
557 				//Loop through each read in the list
558 				for(int idx=0; idx<reads.size(); idx++){
559 					final Read r1=reads.get(idx);
560 
561 					//Validate reads in worker threads
562 					if(!r1.validated()){r1.validate(true);}
563 
564 					//Track the initial length for statistics
565 					final int initialLength1=r1.length();
566 
567 					//Increment counters
568 					readsProcessedT++;
569 					basesProcessedT+=initialLength1;
570 
571 					{
572 						//Reads are processed in this block.
573 						boolean keep=processRead(r1);
574 						if(keep){
575 							//Increment counters
576 							readsOutT++;
577 							basesOutT+=initialLength1;
578 							good.add(r1);
579 						}else{
580 							bad.add(r1);
581 						}
582 					}
583 				}
584 
585 				//Output reads to the output stream
586 				if(ros!=null){ros.add(good, ln.id);}
587 				if(rosb!=null){rosb.add(bad, ln.id);}
588 
589 				//Fetch a new list
590 				ln=ss.nextList();
591 				reads=(ln!=null ? ln.list : null);
592 			}
593 
594 			//Notify the input stream that the final list was used
595 			if(ln!=null){
596 				cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
597 			}
598 		}
599 
600 		/**
601 		 * Process a read.
602 		 * @param r1 Read 1
603 		 * @return True if the read should be kept, false if it should be discarded.
604 		 */
605 		boolean processRead(final Read r1){
606 			return passesVariantFilter(r1);
607 		}
608 
609 		private final boolean passesVariantFilter(Read r){
610 			if(!r.mapped() || r.bases==null || r.samline==null || r.match==null){return true;}
611 			final int vars=(subsOnly ? Read.countSubs(r.match) : Read.countVars(r.match, Var.CALL_SUB, Var.CALL_INS, Var.CALL_DEL));
612 			final int len=r.length();
613 			final double q=r.avgQualityByProbabilityDouble(false, r.length());
614 			final SamLine sl=r.samline;
615 			mappedReadsProcessedT++;
616 			mappedBasesProcessedT+=len;
617 
618 			if(vars==0){
619 				varSumGoodT+=vars;
620 				qSumGoodT+=q;
621 				mapqSumGoodT+=sl.mapq;
622 				mappedReadsRetainedT++;
623 				mappedBasesRetainedT+=len;
624 				return true;
625 			}
626 
627 			if((maxVars>=0 && vars>maxVars) ||
628 					(maxSubs>=0 && vars>maxSubs && ((subsOnly ? vars : Read.countSubs(r.match))>maxSubs))){
629 				varSumBadT+=vars;
630 				qSumBadT+=q;
631 				mapqSumBadT+=sl.mapq;
632 				return false;
633 			}
634 
635 			if(vars<=maxBadVars){
636 				varSumGoodT+=vars;
637 				qSumGoodT+=q;
638 				mapqSumGoodT+=sl.mapq;
639 				mappedReadsRetainedT++;
640 				mappedBasesRetainedT+=len;
641 				return true;
642 			}
643 
644 			final ArrayList<Var> list;
645 			if(subsOnly){
646 				list=CallVariants.findUniqueSubs(r, sl, varMap, scafMap, maxBadAlleleDepth, maxBadAlleleFraction, minBadReadDepth, minEDist);
647 			}else{
648 				list=CallVariants.findUniqueVars(r, sl, varMap, scafMap, maxBadAlleleDepth, maxBadAlleleFraction, minBadReadDepth, minEDist);
649 			}
650 			if(list==null || list.size()<=maxBadVars){
651 				varSumGoodT+=vars;
652 				qSumGoodT+=q;
653 				mapqSumGoodT+=sl.mapq;
654 				mappedReadsRetainedT++;
655 				mappedBasesRetainedT+=len;
656 				return true;
657 			}else{
658 				assert(list.size()>maxBadVars) : list.size()+", "+maxBadVars;
659 				for(Var v : list){
660 					assert(v!=null) : sl.cigar+"\n"+vars+"\n"+v+"\n"+list;
661 					assert((v.type()==Var.SUB && Var.CALL_SUB) || (v.type()==Var.INS && Var.CALL_INS) || (v.type()==Var.DEL && Var.CALL_DEL)) :
662 						list.size()+", "+maxBadVars+"\n"+sl.cigar+"\n"+v+"\n"+list;
663 				}
664 				varSumBadT+=vars;
665 				qSumBadT+=q;
666 				mapqSumBadT+=sl.mapq;
667 				return false;
668 			}
669 		}
670 
671 		/** Number of reads processed by this thread */
672 		protected long readsProcessedT=0;
673 		/** Number of reads processed by this thread */
674 		protected long basesProcessedT=0;
675 		/** Number of mapped reads processed by this thread */
676 		protected long mappedReadsProcessedT=0;
677 		/** Number of mapped bases processed by this thread */
678 		protected long mappedBasesProcessedT=0;
679 		/** Number of mapped reads retained by this thread */
680 		protected long mappedReadsRetainedT=0;
681 		/** Number of mapped bases retained by this thread */
682 		protected long mappedBasesRetainedT=0;
683 		/** Number of good reads processed by this thread */
684 		protected long readsOutT=0;
685 		/** Number of good bases processed by this thread */
686 		protected long basesOutT=0;
687 		protected double qSumGoodT=0;
688 		protected double qSumBadT=0;
689 		protected long varSumGoodT=0;
690 		protected long varSumBadT=0;
691 		protected long mapqSumGoodT=0;
692 		protected long mapqSumBadT=0;
693 
694 		/** True only if this thread has completed successfully */
695 		boolean success=false;
696 
697 		/** Shared input stream */
698 		private final ConcurrentReadInputStream cris;
699 		/** Shared input stream */
700 		private final SamStreamer ss;
701 		/** Good output stream */
702 		private final ConcurrentReadOutputStream ros;
703 		/** Bad output stream */
704 		private final ConcurrentReadOutputStream rosb;
705 		/** Thread ID */
706 		final int tid;
707 	}
708 
709 	/*--------------------------------------------------------------*/
710 	/*----------------            Fields            ----------------*/
711 	/*--------------------------------------------------------------*/
712 
713 	/** Primary input file path */
714 	private String in1=null;
715 
716 	/** Optional reference */
717 	private String ref=null;
718 
719 	/** Good output file path */
720 	private String outGood=null;
721 	/** Bad output file path */
722 	private String outBad=null;
723 
724 	/** VCF file path */
725 	private String varFile=null;
726 	/** Variant file path */
727 	private String vcfFile=null;
728 	private VarMap varMap=null;
729 	private ScafMap scafMap=null;
730 
731 	/*--------------------------------------------------------------*/
732 
733 	/** Maximum allowed substitutions in a read */
734 	private int maxSubs=-1;
735 	/** Maximum allowed variations in a read */
736 	private int maxVars=-1;
737 
738 	/** Maximum allowed unsupported substitutions in a read */
739 	private int maxBadVars=1;
740 	/** Maximum variant depth for a variant to be considered unsupported */
741 	private int maxBadAlleleDepth=2;
742 	/** Maximum allele fraction for variants considered unsupported */
743 	private float maxBadAlleleFraction=0.01f;
744 	/** Minimum read depth for a variant to be considered unsupported */
745 	private int minBadReadDepth=2;
746 	/** Ignore vars within this distance of the ends */
747 	private int minEDist=5;
748 	private int ploidy=1;
749 	private boolean prefilter=false;
750 
751 	/** Number of reads processed */
752 	protected long readsProcessed=0;
753 	/** Number of bases processed */
754 	protected long basesProcessed=0;
755 	/** Number of mapped reads processed by this thread */
756 	protected long mappedReadsProcessed=0;
757 	/** Number of mapped bases processed by this thread */
758 	protected long mappedBasesProcessed=0;
759 	/** Number of mapped reads retained by this thread */
760 	protected long mappedReadsRetained=0;
761 	/** Number of mapped bases retained by this thread */
762 	protected long mappedBasesRetained=0;
763 	/** Number of good reads processed */
764 	protected long readsOut=0;
765 	/** Number of good bases processed */
766 	protected long basesOut=0;
767 
768 	protected double bqSumGood=0;
769 	protected double bqSumBad=0;
770 //	protected double adSumGood=0;
771 //	protected double adSumBad=0;
772 //	protected double rdSumGood=0;
773 //	protected double rdSumBad=0;
774 
775 	protected long varSumGood=0;
776 	protected long varSumBad=0;
777 	protected long mapqSumGood=0;
778 	protected long mapqSumBad=0;
779 
780 	/** Quit after processing this many input reads; -1 means no limit */
781 	private long maxReads=-1;
782 
783 	static boolean useStreamer=true;
784 	static int streamerThreads=3;
785 
786 	/*--------------------------------------------------------------*/
787 	/*----------------         Final Fields         ----------------*/
788 	/*--------------------------------------------------------------*/
789 
790 	/** Primary input file */
791 	private final FileFormat ffin1;
792 
793 	/** Primary output file */
794 	private final FileFormat ffoutGood;
795 	/** Secondary output file */
796 	private final FileFormat ffoutBad;
797 
798 	private final boolean subsOnly;
799 
800 	/*--------------------------------------------------------------*/
801 	/*----------------        Common Fields         ----------------*/
802 	/*--------------------------------------------------------------*/
803 
804 	/** Print status messages to this output stream */
805 	private PrintStream outstream=System.err;
806 	/** Print verbose messages */
807 	public static boolean verbose=false;
808 	/** True if an error was encountered */
809 	public boolean errorState=false;
810 	/** Overwrite existing output files */
811 	private boolean overwrite=false;
812 	/** Append to existing output files */
813 	private boolean append=false;
814 	/** Reads are output in input order */
815 	private boolean ordered=true;
816 
817 }
818