1 package icecream;
2 
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 import java.util.Random;
6 
7 import aligner.Aligner;
8 import consensus.BaseGraph;
9 import dna.AminoAcid;
10 import dna.Data;
11 import fileIO.ByteFile;
12 import fileIO.ByteStreamWriter;
13 import fileIO.FileFormat;
14 import fileIO.ReadWrite;
15 import json.JsonObject;
16 import prok.GeneCaller;
17 import shared.KillSwitch;
18 import shared.Parse;
19 import shared.Parser;
20 import shared.PreParser;
21 import shared.ReadStats;
22 import shared.Shared;
23 import shared.Timer;
24 import shared.Tools;
25 import shared.TrimRead;
26 import stream.ConcurrentReadOutputStream;
27 import stream.FASTQ;
28 import stream.FastaReadInputStream;
29 import stream.Read;
30 import stream.SamLine;
31 import structures.ByteBuilder;
32 import structures.EntropyTracker;
33 import structures.IntHashSet;
34 import structures.IntList;
35 
36 /**
37  * Version of Reformat designed for PacBio data.
38  * Supports some of Reformat's capability, like subsampling,
39  * in a ZMW-aware tool.
40  *
41  * @author Brian Bushnell
42  * @date June 5, 2019
43  *
44  */
45 public final class ReformatPacBio {
46 
47 	/*--------------------------------------------------------------*/
48 	/*----------------        Initialization        ----------------*/
49 	/*--------------------------------------------------------------*/
50 
51 	/**
52 	 * Code entrance from the command line.
53 	 * @param args Command line arguments
54 	 */
main(String[] args)55 	public static void main(String[] args){
56 		//Start a timer immediately upon code entrance.
57 		Timer t=new Timer();
58 
59 		//Create an instance of this class
60 		ReformatPacBio x=new ReformatPacBio(args);
61 
62 		//Run the object
63 		x.process(t);
64 
65 		//Close the print stream if it was redirected
66 		Shared.closeStream(x.outstream);
67 	}
68 
69 	/**
70 	 * Constructor.
71 	 * @param args Command line arguments
72 	 */
ReformatPacBio(String[] args)73 	public ReformatPacBio(String[] args){
74 
75 		{//Preparse block for help, config files, and outstream
76 			PreParser pp=new PreParser(args, getClass(), false);
77 			args=pp.args;
78 			outstream=pp.outstream;
79 		}
80 
81 		//Set shared static variables prior to parsing
82 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
83 		ReadWrite.USE_BGZIP=ReadWrite.USE_UNBGZIP=ReadWrite.PREFER_BGZIP=true;
84 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
85 		FASTQ.TEST_INTERLEAVED=FASTQ.FORCE_INTERLEAVED=false;
86 		SamLine.SET_FROM_OK=true;
87 		Shared.setBufferData(1000000);
88 //		Shared.FASTA_WRAP=511;
89 		Data.USE_SAMBAMBA=false;//Sambamba changes PacBio headers.
90 		Read.CHANGE_QUALITY=false;
91 		EntropyTracker.defaultK=3;
92 
93 		{//Parse the arguments
94 			final Parser parser=parse(args);
95 			Parser.processQuality();
96 
97 			maxReads=parser.maxReads;
98 			overwrite=ReadStats.overwrite=parser.overwrite;
99 			append=ReadStats.append=parser.append;
100 
101 			in1=parser.in1;
102 			extin=parser.extin;
103 
104 			if(outg==null){outg=parser.out1;}
105 			extout=parser.extout;
106 		}
107 
108 		sampleReadsExact=sampleReadsTarget>-1;
109 		sampleBasesExact=sampleBasesTarget>-1;
110 		sampleZMWsExact=sampleZMWsTarget>-1;
111 		sampleExact=(sampleReadsExact || sampleBasesExact || sampleZMWsExact);
112 
113 		//Determine how many threads may be used
114 		threads=(sampleExact || (samplerate>0 && seed>=0)) ? 1 : Shared.threads();
115 
116 		//Ensure there is an input file
117 		if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
118 		fixExtensions(); //Add or remove .gz or .bz2 as needed
119 		checkFileExistence(); //Ensure files can be read and written
120 		checkStatics(); //Adjust file-related static fields as needed for this program
121 
122 		//Create output FileFormat objects
123 		ffoutg=FileFormat.testOutput(outg, FileFormat.FASTQ, extout, true, overwrite, append, false);
124 		ffoutb=FileFormat.testOutput(outb, FileFormat.FASTQ, extout, true, overwrite, append, false);
125 		ffstats=FileFormat.testOutput(outstats, FileFormat.TXT, null, false, overwrite, append, false);
126 		ffschist=FileFormat.testOutput(schist, FileFormat.TXT, null, false, overwrite, append, false);
127 
128 		//Create input FileFormat objects
129 		ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
130 		if(verbose){System.err.println("Finished constructor for "+getClass().getName());}
131 	}
132 
133 	/*--------------------------------------------------------------*/
134 	/*----------------    Initialization Helpers    ----------------*/
135 	/*--------------------------------------------------------------*/
136 
137 	/** Parse arguments from the command line */
parse(String[] args)138 	private Parser parse(String[] args){
139 		//Create a parser object
140 		Parser parser=new Parser();
141 
142 		//Set any necessary Parser defaults here
143 		//parser.foo=bar;
144 
145 		//Parse each argument
146 		for(int i=0; i<args.length; i++){
147 			String arg=args[i];
148 
149 			//Break arguments into their constituent parts, in the form of "a=b"
150 			String[] split=arg.split("=");
151 			String a=split[0].toLowerCase();
152 			String b=split.length>1 ? split[1] : null;
153 			if(b!=null && b.equalsIgnoreCase("null")){b=null;}
154 
155 			if(a.equals("verbose")){
156 				verbose=Parse.parseBoolean(b);
157 			}else if(a.equals("format")){
158 				if(b==null){
159 					assert(false) : arg;
160 				}else if(Tools.isDigit(b.charAt(i))){
161 					format=Integer.parseInt(b);
162 				}else if(b.equalsIgnoreCase("json")){
163 					format=FORMAT_JSON;
164 				}else if(b.equalsIgnoreCase("text")){
165 					format=FORMAT_TEXT;
166 				}else{
167 					assert(false) : arg;
168 				}
169 				assert(format>=1 && format<=2) : arg;
170 			}else if(a.equals("json")){
171 				boolean x=Parse.parseBoolean(b);
172 				format=(x ? FORMAT_JSON : FORMAT_TEXT);
173 			}else if(a.equals("ss") || a.equals("samstreamer") || a.equals("streamer")){
174 				if(b!=null && Tools.isDigit(b.charAt(0))){
175 					ZMWStreamer.useStreamer=true;
176 					assert(Integer.parseInt(b)==1) : "ZMWStreamer threads currently capped at 1.";
177 //					ZMWStreamer.streamerThreads=Tools.max(1, Integer.parseInt(b));
178 				}else{
179 					ZMWStreamer.useStreamer=Parse.parseBoolean(b);
180 				}
181 			}
182 
183 			else if(a.equals("keepshortreads") || a.equals("ksr")){
184 				keepShortReads=Parse.parseBoolean(b);
185 			}else if(a.equalsIgnoreCase("keepzmwstogether") || a.equals("kzt") || a.equals("keepreadstogether") || a.equals("krt")){
186 				keepZMWsTogether=Parse.parseBoolean(b);
187 			}else if(a.equalsIgnoreCase("subsampleFromEnds")){
188 				subsampleFromEnds=Parse.parseBoolean(b);
189 			}
190 
191 			else if(a.equals("ccsinput") || a.equals("ccsin")){
192 				CCSInput=Parse.parseBoolean(b);
193 			}else if(a.equals("minlength") || a.equals("minlen")){
194 				minLengthAfterTrimming=Integer.parseInt(b);
195 			}else if(a.equals("flaglongreads")){
196 				flagLongReads=Parse.parseBoolean(b);
197 			}else if(a.equals("longreadmult")){
198 				longReadMult=Float.parseFloat(b);
199 			}else if(a.equals("trimreads") || a.equals("trim")){
200 				trimReads=Parse.parseBoolean(b);
201 			}else if(a.equals("parsecustom")){
202 				parseCustom=Parse.parseBoolean(b);
203 			}else if(a.equals("outg") || a.equals("outgood")){
204 				outg=b;
205 			}else if(a.equals("outb") || a.equals("outbad")){
206 				outb=b;
207 			}else if(a.equals("outs") || a.equals("outstats") || a.equals("stats")){
208 				outstats=b;
209 			}else if(a.equals("schist") || a.equals("shist")){
210 				schist=b;
211 			}
212 
213 			else if(a.equals("shredlen")){
214 				shredLength=Integer.parseInt(b);
215 			}else if(a.equals("overlap")){
216 				overlap=Integer.parseInt(b);
217 			}else if(a.equalsIgnoreCase("minShredIdentity") || a.equalsIgnoreCase("minShredId") ||
218 					 a.equalsIgnoreCase("shredId")){
219 				minShredIdentity=Tools.max(0.01f, Float.parseFloat(b));
220 				if(minShredIdentity>1){minShredIdentity*=0.01f;}
221 				assert(minShredIdentity>0 && minShredIdentity<=1) :
222 					"minShredIdentity ranges from 0 (exclusive) to 1 (inclusive).";
223 			}else if(a.equals("ccs") || a.equals("makeccs") || a.equals("consensus")){
224 				makeCCS=Parse.parseBoolean(b);
225 			}else if(a.equals("findorientation") || a.equals("orient") || a.equals("reorient")){
226 				findOrientation=Parse.parseBoolean(b);
227 			}
228 
229 			else if(a.equals("minpasses")){
230 				minPasses=Float.parseFloat(b);
231 			}else if(a.equals("minsubreads")){
232 				minSubreads=Integer.parseInt(b);
233 			}
234 
235 			else if(a.equalsIgnoreCase("samplerate")){
236 				samplerate=Float.parseFloat(b);
237 			}else if(a.equalsIgnoreCase("sampleReadsTarget") || a.equals("srt")){
238 				sampleReadsTarget=Parse.parseKMG(b);
239 			}else if(a.equalsIgnoreCase("sampleBasesTarget") || a.equals("sbt")){
240 				sampleBasesTarget=Parse.parseKMG(b);
241 			}else if(a.equalsIgnoreCase("sampleZMWsTarget") || a.equals("szt")){
242 				sampleZMWsTarget=Parse.parseKMG(b);
243 			}else if(a.equals("bestpass") || a.equals("keepbestpass") ||
244 					a.equals("keepbest") || a.equals("bestpassonly")){
245 				keepBestPass=Parse.parseBoolean(b);
246 			}else if(a.equals("longestpass") || a.equals("keeplongestpass") ||
247 					a.equals("keeplongest") || a.equals("longestpassonly")){
248 				keepLongestPass=Parse.parseBoolean(b);
249 			}else if(a.equals("seed")){
250 				seed=Long.parseLong(b);
251 			}
252 
253 			else if(a.equalsIgnoreCase("zmws") || a.equalsIgnoreCase("maxzmws")){
254 				maxZMWs=Parse.parseKMG(b);
255 			}
256 
257 			else if(a.equals("whitelist")){
258 				whitelist=b;
259 			}else if(a.equals("whitelist")){
260 				blacklist=b;
261 			}
262 
263 			else if(a.equalsIgnoreCase("trimpolya")){
264 				trimPolyA=Parse.parseBoolean(b);
265 			}else if(PolymerTrimmer.parse(arg, a, b)){
266 				//do nothing
267 			}else if(a.equals("minentropy") || a.equals("entropy") || a.equals("entropyfilter") || a.equals("efilter")){
268 				if(b==null || Character.isLetter(b.charAt(0))){
269 					if(Parse.parseBoolean(b)){
270 						entropyCutoff=0.55f;
271 					}else{
272 						entropyCutoff=-1;
273 					}
274 				}else{
275 					entropyCutoff=Float.parseFloat(b);
276 				}
277 			}else if(a.equals("entropyblock") || a.equals("entropylength") || a.equals("entropylen") || a.equals("entlen")){
278 				entropyLength=Parse.parseIntKMG(b);
279 			}else if(a.equals("entropyfraction") || a.equals("entfraction")){
280 				entropyFraction=Float.parseFloat(b);
281 			}else if(a.equals("monomerfraction") || a.equals("maxmonomerfraction") || a.equals("mmf")){
282 				maxMonomerFraction=Float.parseFloat(b);
283 			}else if(a.equals("parse_flag_goes_here")){
284 				long fake_variable=Parse.parseKMG(b);
285 				//Set a variable here
286 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
287 				//do nothing
288 			}else{
289 				outstream.println("Unknown parameter "+args[i]);
290 				assert(false) : "Unknown parameter "+args[i];
291 			}
292 		}
293 
294 		if(verbose){System.err.println("Finished parser for "+getClass().getName());}
295 		return parser;
296 	}
297 
298 	/** Add or remove .gz or .bz2 as needed */
fixExtensions()299 	private void fixExtensions(){
300 		in1=Tools.fixExtension(in1);
301 	}
302 
303 	/** Ensure files can be read and written */
checkFileExistence()304 	private void checkFileExistence(){
305 
306 		//Ensure output files can be written
307 		if(!Tools.testOutputFiles(overwrite, append, false, outg, outb, outstats, schist)){
308 			outstream.println((outg==null)+", "+(outb==null)+", "+outg+", "+outb+", "+outstats);
309 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+
310 					outg+", "+outb+", "+outstats+", "+schist+"\n");
311 		}
312 
313 		//Ensure input files can be read
314 		if(!Tools.testInputFiles(false, true, in1, whitelist, blacklist)){
315 			throw new RuntimeException("\nCan't read some input files.\n");
316 		}
317 
318 		//Ensure that no file was specified multiple times
319 		if(!Tools.testForDuplicateFiles(true, in1, outg, outb, outstats, schist, whitelist, blacklist)){
320 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
321 		}
322 	}
323 
324 	/** Adjust file-related static fields as needed for this program */
checkStatics()325 	private static void checkStatics(){
326 		//Adjust the number of threads for input file reading
327 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
328 			ByteFile.FORCE_MODE_BF2=true;
329 		}
330 
331 		assert(FastaReadInputStream.settingsOK());
332 	}
333 
334 	/*--------------------------------------------------------------*/
335 	/*----------------         Outer Methods        ----------------*/
336 	/*--------------------------------------------------------------*/
337 
338 	/** Create read streams and process all data */
process(Timer t)339 	void process(Timer t){
340 		if(verbose){System.err.println("Entered process()");}
341 
342 		whiteSet=Tools.loadIntSet(whitelist);
343 		blackSet=Tools.loadIntSet(blacklist);
344 
345 		//Count reads in the original file
346 		if(sampleExact){countInitial();}
347 
348 		//Turn off read validation in the input threads to increase speed
349 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
350 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
351 
352 		//Create a read input stream
353 		ZMWStreamer zstream=new ZMWStreamer(ffin1, Shared.threads(), maxReads, maxZMWs);
354 
355 		//Optionally create read output streams
356 		final ConcurrentReadOutputStream rosg=makeCros(ffoutg);
357 		final ConcurrentReadOutputStream rosb=makeCros(ffoutb);
358 
359 		//Reset counters
360 		readsProcessed=readsOut=0;
361 		basesProcessed=basesOut=0;
362 
363 		//Process the reads in separate threads
364 		spawnThreads(zstream, rosg, rosb);
365 
366 		if(verbose){outstream.println("Finished; closing streams.");}
367 
368 		//Write anything that was accumulated by ReadStats
369 		errorState|=ReadStats.writeAll();
370 		//Close the read streams
371 		errorState|=ReadWrite.closeStreams(null, rosg, rosb);
372 
373 		//Reset read validation
374 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
375 
376 		writeHistogram(ffschist, subreadCounts);
377 
378 		//Report timing and results
379 		t.stop();
380 
381 		String stats=null;
382 		if(format==FORMAT_TEXT){
383 			ByteBuilder bb=toText(t);
384 			stats=bb.nl().toString();
385 		}else if(format==FORMAT_JSON){
386 			JsonObject jo=toJson(t);
387 			stats=jo.toStringln();
388 		}else{
389 			assert(false) : "Bad format: "+format;
390 		}
391 		if(ffstats==null){
392 			outstream.print(stats);
393 		}else{
394 			ReadWrite.writeString(stats, outstats);
395 		}
396 
397 		//Throw an exception of there was an error in a thread
398 		if(errorState){
399 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
400 		}
401 	}
402 
403 	private ByteBuilder toText(Timer t){
404 		ByteBuilder bb=new ByteBuilder();
405 		bb.appendln(Tools.timeZMWsReadsBasesProcessed(t, ZMWs, readsProcessed, basesProcessed, 8));
406 		bb.appendln(Tools.ZMWsReadsBasesOut(ZMWs, readsProcessed, basesProcessed, ZMWsOut, readsOut, basesOut, 8, true));
407 
408 		long readsFiltered=readsProcessed-readsOut;
409 		bb.appendln(Tools.numberPercent("Reads Filtered:", readsFiltered, readsFiltered*100.0/(readsProcessed), 3, 8));
410 		if(trimReads || trimPolyA){
411 			bb.appendln(Tools.numberPercent("Reads Trimmed:", readsTrimmed, readsTrimmed*100.0/(readsProcessed), 3, 8));
412 			bb.appendln(Tools.numberPercent("Bases Trimmed:", basesTrimmed, basesTrimmed*100.0/(basesProcessed), 3, 8));
413 		}
414 //		bb.appendln(Tools.number("Total ZMWs:", ZMWs, 8));
415 		bb.appendln(Tools.numberPercent("Partial ZMWs:", partiallyDiscardedZMWs, partiallyDiscardedZMWs*100.0/(ZMWs), 3, 8));
416 		bb.appendln(Tools.numberPercent("Discarded ZMWs:", fullyDiscardedZMWs, fullyDiscardedZMWs*100.0/(ZMWs), 3, 8));
417 //		bb.appendln(Tools.numberPercent("Low Entropy:", lowEntropyReads, lowEntropyReads*100.0/(readsProcessed), 3, 8));
418 		if(entropyCutoff>=0){
419 			bb.appendln(Tools.numberPercent("Low Entropy:", lowEntropyZMWs, lowEntropyZMWs*100.0/(ZMWs), 3, 8));
420 		}
421 
422 		if(parseCustom){}
423 		return bb;
424 	}
425 
toJson(Timer t)426 	private JsonObject toJson(Timer t){
427 		JsonObject jo=new JsonObject();
428 		long readsFiltered=readsProcessed-readsOut;
429 
430 //		asdf
431 		jo.add("Time", t.timeInSeconds());
432 		jo.add("ZMWs_Processed", ZMWs);
433 		jo.add("Reads_Processed", readsProcessed);
434 		jo.add("Bases_Processed", basesProcessed);
435 		jo.add("Reads_Out", readsOut);
436 		jo.add("Bases_Out", basesOut);
437 		jo.add("ZMWs_Out", ZMWsOut);
438 		jo.add("Reads_Filtered", readsFiltered);
439 		jo.add("Reads_Filtered_Pct", readsFiltered*100.0/(readsProcessed));
440 		if(trimReads){
441 			jo.add("Reads_Trimmed", readsTrimmed);
442 			jo.add("Reads_Trimmed_Pct", readsTrimmed*100.0/(readsProcessed));
443 			jo.add("Bases_Trimmed", basesTrimmed);
444 			jo.add("Bases_Trimmed_Pct", basesTrimmed*100.0/(basesProcessed));
445 		}
446 //		jo.add("Total_ZMWs", ZMWs);fullyDiscardedZMWs
447 		jo.add("Partial_ZMWs", partiallyDiscardedZMWs);
448 		jo.add("Partial_ZMWs_Pct", partiallyDiscardedZMWs*100.0/(ZMWs));
449 		jo.add("Discarded_ZMWs", fullyDiscardedZMWs);
450 		jo.add("Discarded_ZMWs_Pct", fullyDiscardedZMWs*100.0/(ZMWs));
451 //		jo.add("Low_Entropy", lowEntropyReads);
452 //		jo.add("Low_Entropy_Pct", lowEntropyReads*100.0/(readsProcessed));
453 		if(entropyCutoff>=0){
454 			jo.add("Low_Entropy", lowEntropyZMWs);
455 			jo.add("Low_Entropy_Pct", lowEntropyZMWs*100.0/(ZMWs));
456 		}
457 		if(parseCustom){
458 			//Special stats if desired
459 		}
460 		return jo;
461 	}
462 
writeHistogram(FileFormat ff, long[] hist)463 	private static void writeHistogram(FileFormat ff, long[] hist){
464 		if(ff==null){return;}
465 		final ByteStreamWriter bsw=new ByteStreamWriter(ff);
466 		bsw.start();
467 
468 		bsw.print("#Counted\t").println(Tools.sum(hist));
469 		bsw.print("#Mean\t").println(Tools.averageHistogram(hist), 3);
470 		bsw.print("#Median\t").println(Tools.medianHistogram(hist));
471 		bsw.print("#Mode\t").println(Tools.calcModeHistogram(hist));
472 		bsw.print("#STDev\t").println(Tools.standardDeviationHistogram(hist), 3);
473 		bsw.print("#Value\tOccurances\n");
474 
475 		for(int i=0; i<hist.length; i++){
476 			bsw.print(i).tab().println(hist[i]);
477 		}
478 		bsw.poisonAndWait();
479 	}
480 
makeCros(FileFormat ff)481 	private ConcurrentReadOutputStream makeCros(FileFormat ff){
482 		if(ff==null){return null;}
483 
484 		//Select output buffer size based on whether it needs to be ordered
485 		final int buff=16;
486 
487 		final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(
488 				ff, null, buff, null, ff.samOrBam() && ffin1.samOrBam());
489 		ros.start(); //Start the stream
490 		return ros;
491 	}
492 
493 	/** Spawn process threads */
spawnThreads(final ZMWStreamer zstream, final ConcurrentReadOutputStream rosg, final ConcurrentReadOutputStream rosb)494 	private void spawnThreads(final ZMWStreamer zstream,
495 			final ConcurrentReadOutputStream rosg,
496 			final ConcurrentReadOutputStream rosb){
497 
498 		//Do anything necessary prior to processing
499 
500 		//Fill a list with ProcessThreads
501 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
502 		for(int i=0; i<threads; i++){
503 			alpt.add(new ProcessThread(zstream, rosg, rosb, i));
504 		}
505 
506 		//Start the threads
507 		for(ProcessThread pt : alpt){
508 			pt.start();
509 		}
510 
511 		zstream.runStreamer(false);
512 
513 		//Wait for threads to finish
514 		waitForThreads(alpt);
515 
516 		//Do anything necessary after processing
517 		ZMWs=zstream.ZMWs;
518 	}
519 
waitForThreads(ArrayList<ProcessThread> alpt)520 	private void waitForThreads(ArrayList<ProcessThread> alpt){
521 
522 		//Wait for completion of all threads
523 		boolean success=true;
524 		for(ProcessThread pt : alpt){
525 
526 			//Wait until this thread has terminated
527 			while(pt.getState()!=Thread.State.TERMINATED){
528 				try {
529 					//Attempt a join operation
530 					pt.join();
531 				} catch (InterruptedException e) {
532 					//Potentially handle this, if it is expected to occur
533 					e.printStackTrace();
534 				}
535 			}
536 
537 			//Accumulate per-thread statistics
538 			readsProcessed+=pt.readsProcessedT;
539 			basesProcessed+=pt.basesProcessedT;
540 			ZMWs+=pt.ZMWsT;
541 
542 			readsOut+=pt.readsOutT;
543 			basesOut+=pt.basesOutT;
544 			ZMWsOut+=pt.ZMWsOutT;
545 
546 			partiallyDiscardedZMWs+=pt.partiallyDiscardedZMWsT;
547 			fullyDiscardedZMWs+=pt.fullyDiscardedZMWsT;
548 			lowEntropyZMWs+=pt.lowEntropyZMWsT;
549 			lowEntropyReads+=pt.lowEntropyReadsT;
550 
551 			basesTrimmed+=pt.basesTrimmedT;
552 			readsTrimmed+=pt.readsTrimmedT;
553 
554 			success&=pt.success;
555 		}
556 
557 		//Track whether any threads failed
558 		if(!success){errorState=true;}
559 	}
560 
561 	/*--------------------------------------------------------------*/
562 	/*----------------          CCS Methods         ----------------*/
563 	/*--------------------------------------------------------------*/
564 
makeConsensus(ZMW zmw)565 	public Read makeConsensus(ZMW zmw){
566 		//Apply correct strand
567 		for(int i=0; i<zmw.size(); i++){
568 			zmw.get(i).setStrand(i&1);
569 		}
570 		final Read ref=zmw.medianRead(false);
571 		if(zmw.size()<3 || zmw.estimatePasses()<2.1){return ref;}
572 
573 		BaseGraph bg=new BaseGraph(ref.id, ref.bases, ref.quality, ref.numericID, 0);
574 		float idSum=0;
575 		int added=0;
576 		for(Read r : zmw){
577 			if(r!=ref){
578 				final boolean rcomp=(r.strand()!=ref.strand());
579 				if(rcomp){r.reverseComplement();}
580 				float id=shredAndAdd(r, bg, null);
581 				idSum+=id;
582 				added++;
583 				if(rcomp){r.reverseComplement();}
584 			}
585 		}
586 		float avgId=idSum/(Tools.max(1, added));
587 
588 		//TODO: Also interesting to get a traversal quality here...
589 		Read c=bg.traverse();
590 		c.obj=avgId;
591 
592 		//If not enough subreads aligned, discard the result
593 		if(minPasses>1 && bg.baseTotal/(float)ref.length()<minPasses-1){c.setDiscarded(true);}
594 		return c;
595 	}
596 
shredAndAdd(Read subread, BaseGraph bg, Aligner ssa)597 	private float shredAndAdd(Read subread, BaseGraph bg, Aligner ssa){
598 		int added=0;
599 		float idSum=0;
600 		if(ssa==null){ssa=GeneCaller.getSSA();}
601 		if(subread.length()<=shredLength){
602 			float id=bg.alignAndGenerateMatch(subread, ssa, findOrientation, minShredIdentity);
603 			added++;
604 			idSum+=id;
605 			if(id>=minShredIdentity){
606 				bg.add(subread);
607 			}
608 		}else{
609 			ArrayList<Read> shreds=shred(subread, shredLength, overlap);
610 			for(int i=0; i<shreds.size(); i++){
611 				Read shred=shreds.get(i);
612 				float id=bg.alignAndGenerateMatch(shred, ssa, findOrientation, minShredIdentity);
613 				added++;
614 				idSum+=id;
615 				if(id>=minShredIdentity){
616 					bg.add(shred, (i>0 ? overlap-1 : 0), 0);
617 				}
618 			}
619 		}
620 		float avgId=idSum/(Tools.max(1, added));
621 		return avgId;
622 	}
623 
shred(final Read r, final int shredLength, final int overlap)624 	private static ArrayList<Read> shred(final Read r, final int shredLength, final int overlap){
625 		final byte[] bases=r.bases;
626 		final byte[] quals=r.quality;
627 		final String name=r.id;
628 		final int increment=shredLength-overlap;
629 		final double incMult=1.0f/increment;
630 		final int chunks=bases.length<=shredLength ? 1 : (int)Math.ceil((bases.length-overlap)*incMult);
631 		assert(chunks>0);
632 		final double inc2=bases.length/(double)chunks;
633 
634 		final ArrayList<Read> list=new ArrayList<Read>(chunks);
635 		if(chunks==1){
636 			list.add(r);
637 			return list;
638 		}
639 
640 		for(int chunk=0; chunk<chunks; chunk++){
641 			int a=(int)Math.floor(inc2*chunk);
642 			int b=(chunk==chunks-1 ? bases.length : overlap+(int)Math.floor(inc2*(chunk+1)));
643 			b=Tools.min(b, a+shredLength);
644 			final int length=b-a;
645 //			if(length<minLength){return;}
646 			final byte[] bases2=KillSwitch.copyOfRange(bases, a, b);
647 			final byte[] quals2=(quals==null ? null : KillSwitch.copyOfRange(quals, a, b));
648 			Read shred=new Read(bases2, quals2, name+"_"+a+"-"+(b-1), r.numericID);
649 //			readsOut++;
650 //			basesOut+=shred.length();
651 			list.add(shred);
652 		}
653 		return list;
654 	}
655 
656 //	/** Generates match string and returns identity */
657 //	public float alignAndGenerateMatch(Read r, Aligner ssa){
658 //		if(ssa==null){ssa=GeneCaller.getSSA();}
659 //		byte[] query=r.bases;
660 //		int a=0, b=ref.length-1;
661 //		int[] max=ssa.fillUnlimited(query, original, a, b, -9999);
662 //		if(max==null){return 0;}
663 //
664 //		final int rows=max[0];
665 //		final int maxCol=max[1];
666 //		final int maxState=max[2];
667 //		final byte[] match=ssa.traceback(query, original, a, b, rows, maxCol, maxState);
668 //		int[] score=ssa.score(query, original, a, b, rows, maxCol, 0);
669 //		r.match=match;
670 //		r.start=score[1];
671 //		r.stop=score[2];
672 //		r.setMapped(true);
673 //		final float identity=Read.identity(match);
674 //		return identity;
675 //	}
676 
677 	/*--------------------------------------------------------------*/
678 	/*----------------         Inner Methods        ----------------*/
679 	/*--------------------------------------------------------------*/
680 
countInitial()681 	void countInitial(){
682 		if(verbose){System.err.println("Entered countInitial()");}
683 		assert(!ffin1.stdio()) : "Target subsampling can't be used with stdin.";
684 		initialReads=0;
685 		initialZMWs=0;
686 		initialBases=0;
687 		ZMWStreamer zstream=new ZMWStreamer(ffin1, Shared.threads(), maxReads, maxZMWs);
688 		zstream.runStreamer(true);
689 		for(ZMW zmw=zstream.nextZMW(); zmw!=null; zmw=zstream.nextZMW()){
690 			initialZMWs++;
691 			initialReads+=zmw.size();
692 			initialBases+=zmw.countBases();
693 		}
694 		readsRemaining=initialReads;
695 		ZMWsRemaining=initialZMWs;
696 		basesRemaining=initialBases;
697 		if(verbose){System.err.println("Finished countInitial()");}
698 	}
699 
700 	/*--------------------------------------------------------------*/
701 	/*----------------         Inner Classes        ----------------*/
702 	/*--------------------------------------------------------------*/
703 
704 	/** Processes reads. */
705 	private class ProcessThread extends Thread {
706 
707 		//Constructor
ProcessThread(final ZMWStreamer zstream_, final ConcurrentReadOutputStream ros_, final ConcurrentReadOutputStream rosb_, final int tid_)708 		ProcessThread(final ZMWStreamer zstream_,
709 				final ConcurrentReadOutputStream ros_,
710 				final ConcurrentReadOutputStream rosb_, final int tid_){
711 			zstream=zstream_;
712 			rosg=ros_;
713 			rosb=rosb_;
714 			tid=tid_;
715 
716 			if(entropyCutoff>=0){
717 				eTracker=new EntropyTracker(false, entropyCutoff, true);
718 			}else{
719 				eTracker=null;
720 			}
721 		}
722 
723 		//Called by start()
724 		@Override
run()725 		public void run(){
726 			//Do anything necessary prior to processing
727 			randy=Shared.threadLocalRandom(seed<0 ? seed : seed+tid);
728 			//Randy can't be initialized in the constructor because it is threadlocal;
729 			//that leads to nonrandom results.
730 
731 			//Process the reads
732 			processInner();
733 
734 			//Do anything necessary after processing
735 
736 			//Indicate successful exit status
737 			success=true;
738 		}
739 
740 		/** Iterate through the reads */
processInner()741 		void processInner(){
742 
743 			//As long as there is a nonempty read list...
744 			for(ZMW reads=zstream.nextZMW(); reads!=null; reads=zstream.nextZMW()){
745 //				if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
746 
747 				processList(reads);
748 			}
749 		}
750 
medianLength(ZMW list)751 		private int medianLength(ZMW list){
752 			if(list.size()<3){return -1;}
753 			IntList lengths=new IntList(list.size()-2);
754 
755 			for(int i=1; i<list.size()-1; i++){
756 				lengths.add(list.get(i).length());
757 			}
758 			lengths.sort();
759 			int median=lengths.get((lengths.size-1)/2);
760 			return median;
761 		}
762 
flagLowEntropyReads(final ZMW reads, final float minEnt, final int minLen0, final float minFract)763 		int flagLowEntropyReads(final ZMW reads, final float minEnt,
764 				final int minLen0, final float minFract){
765 			int found=0;
766 			for(Read r : reads){
767 				if(!r.discarded()){
768 					int minLen=Tools.min((int)(r.length()*minFract), minLen0);
769 					int maxBlock=eTracker.longestLowEntropyBlock(r.bases, false, maxMonomerFraction);
770 					if(maxBlock>=minLen){
771 						r.setDiscarded(true);
772 						r.setJunk(true);
773 						found++;
774 //						System.err.println(r.toFasta());
775 					}
776 				}
777 			}
778 			return found;
779 		}
780 
flagLongReads(final ZMW reads, int median)781 		int flagLongReads(final ZMW reads, int median){
782 			int found=0;
783 			for(Read r : reads){
784 				if(r.length()>longReadMult*median){
785 					r.setDiscarded(true);
786 					r.setHasAdapter(true);
787 					found++;
788 				}
789 			}
790 			return found;
791 		}
792 
discardEndReads(ZMW zmw, int toDiscard)793 		void discardEndReads(ZMW zmw, int toDiscard){
794 			if(toDiscard<1){return;}
795 			if(toDiscard==1){
796 				if(zmw.first().length()<zmw.last().length()){
797 					zmw.first().setDiscarded(true);
798 				}else{
799 					zmw.last().setDiscarded(true);
800 				}
801 				return;
802 			}else{
803 				zmw.first().setDiscarded(true);
804 				zmw.last().setDiscarded(true);
805 				for(int i=2; i<toDiscard; i++){
806 					zmw.get(zmw.size()-i).setDiscarded(true);
807 				}
808 			}
809 		}
810 
811 		/** Each list is presumed to be all reads from a ZMW, in order */
processList(final ZMW reads)812 		void processList(final ZMW reads){
813 			ZMWsT++;
814 			{
815 				int idx=Tools.min(subreadCounts.length-1, reads.size());
816 				synchronized(subreadCounts){
817 					//TODO: Slow
818 					//But other statistics could be gathered here as well
819 					subreadCounts[idx]++;
820 				}
821 			}
822 			long numBases=0;
823 
824 			//Loop through each read in the list for statistics
825 			for(int idx=0; idx<reads.size(); idx++){
826 				final Read r1=reads.get(idx);
827 
828 				//Validate reads in worker threads
829 				if(!r1.validated()){r1.validate(true);}
830 
831 				//Track the initial length for statistics
832 				final int initialLength1=r1.length();
833 
834 				//Increment counters
835 				readsProcessedT++;
836 				basesProcessedT+=initialLength1;
837 				numBases+=initialLength1;
838 			}
839 			final boolean fullPass=CCSInput || reads.size()>=3;
840 
841 			if(whiteSet!=null || blackSet!=null){
842 				int zid=reads.zid();
843 				assert(zid>=0) : "Can't parse ZMW ID from header "+reads.get(0).id;
844 				if(whiteSet!=null && !whiteSet.contains(zid)){reads.setDiscarded(true);}
845 				if(blackSet!=null && blackSet.contains(zid)){reads.setDiscarded(true);}
846 			}
847 
848 			final int subreads=reads.size();
849 			final float passes=reads.estimatePasses();
850 			if((minPasses>0 && passes<minPasses) || (minSubreads>0 && reads.size()<minSubreads)){
851 				reads.setDiscarded(true);
852 			}
853 
854 			if(samplerate<1){
855 				if(keepZMWsTogether){
856 					if(randy.nextFloat()>samplerate){
857 						for(Read r : reads){r.setDiscarded(true);}
858 					}
859 				}else if(subsampleFromEnds){
860 					//Roll a fraction of the reads
861 					int remove=0;
862 					for(Read r : reads){
863 						if(randy.nextFloat()>samplerate){
864 							remove++;
865 						}
866 					}
867 					discardEndReads(reads, remove);
868 				}else{
869 					for(Read r : reads){
870 						if(randy.nextFloat()>samplerate){
871 							r.setDiscarded(true);
872 						}
873 					}
874 				}
875 			}
876 
877 			if(keepBestPass || keepLongestPass){
878 				Read best=keepBestPass ? reads.medianRead(false) : reads.longestRead(false);
879 //				assert(reads.size()<5) : keepBestPass+", "+keepLongestPass+
880 //					", "+best.length()+", "+Arrays.toString(reads.lengths());
881 				for(Read r : reads){
882 					if(r!=best){
883 						r.setDiscarded(true);
884 					}
885 				}
886 			}else if(makeCCS && !reads.discarded()){
887 				Read r=makeConsensus(reads);
888 				reads.clear();
889 				reads.add(r);
890 				//TODO: Fix read header so that it looks like a CCS read.
891 				PBHeader pbh=new PBHeader(r.id);
892 				float avgId=(r.obj==null ? 0 : (Float)r.obj);
893 				r.id=pbh.runID+"/"+pbh.zmwID+"/ccs"
894 						+ "\tsubreads="+subreads+"\tpasses="+String.format("%.2f", passes)
895 						+"\tavgID="+String.format("%.4f", avgId);
896 
897 //				ZMW.fixReadHeader(r, 0, trimmed);
898 			}
899 
900 			if(sampleExact){
901 				if(sampleReadsExact){
902 					if(keepZMWsTogether){
903 						assert(readsRemaining>0) : readsRemaining;
904 						double prob=sampleReadsTarget/(double)(readsRemaining);
905 						if(randy.nextDouble()<prob){sampleReadsTarget-=reads.size();}
906 						else{reads.setDiscarded(true);}
907 						readsRemaining-=reads.size();
908 					}else{
909 						for(Read r : reads){
910 							if(r!=null && !r.discarded()){
911 								assert(readsRemaining>0) : readsRemaining;
912 								double prob=sampleReadsTarget/(double)(readsRemaining);
913 								if(randy.nextDouble()<prob){sampleReadsTarget--;}
914 								else{r.setDiscarded(true);}
915 							}
916 							readsRemaining--;
917 						}
918 					}
919 				}else if(sampleBasesExact){
920 					if(keepZMWsTogether){
921 						assert(basesRemaining>0) : basesRemaining;
922 						final long bases=reads.countBases();
923 						double prob=sampleBasesTarget/(double)(basesRemaining);
924 						if(randy.nextDouble()<prob){sampleBasesTarget-=bases;}
925 						else{reads.setDiscarded(true);}
926 						basesRemaining-=bases;
927 					}else{
928 						for(Read r : reads){
929 							if(r!=null && !r.discarded()){
930 								assert(basesRemaining>0) : basesRemaining;
931 								final int bases=r.length();
932 								double prob=sampleBasesTarget/(double)(basesRemaining);
933 								if(randy.nextDouble()<prob){sampleBasesTarget-=bases;}
934 								else{r.setDiscarded(true);}
935 								basesRemaining-=bases;
936 							}
937 						}
938 					}
939 				}else if(sampleZMWsExact){
940 					assert(ZMWsRemaining>0) : ZMWsRemaining;
941 					double prob=sampleZMWsTarget/(double)(ZMWsRemaining);
942 					if(randy.nextDouble()<prob){sampleZMWsTarget--;}
943 					else{reads.setDiscarded(true);}
944 					ZMWsRemaining--;
945 				}else{assert(false) : "No sampling mode.";}
946 			}
947 
948 			if(trimReads || trimPolyA){
949 				int removed=0;
950 				for(int i=0; i<reads.size(); i++){
951 					Read r=reads.get(i);
952 					if(!r.discarded()){
953 						byte a=r.bases[0], b=r.bases[r.length()-1];
954 						int trimmed=0;
955 						if(!AminoAcid.isFullyDefined(a) || !AminoAcid.isFullyDefined(b)){
956 							trimmed+=trimRead(r, 80);
957 						}
958 						if(trimPolyA){
959 							int x=trimPolyA(r);
960 							trimmed+=x;
961 							if(x>0){r.setTrimmed(true);}
962 						}
963 
964 						if(trimmed>0){
965 							basesTrimmedT+=trimmed;
966 							readsTrimmedT++;
967 							//TODO: technically this should track the lefty and right trim amounts
968 							//but it doesn't really matter as long as the sum is correct
969 							ZMW.fixReadHeader(r, 0, trimmed);
970 						}
971 
972 						//TODO: Note again, removing intermediate reads messes up forward-rev ordering
973 						if(r.length()<minLengthAfterTrimming){//Discard short trash
974 							//						reads.set(i, null);
975 							r.setDiscarded(true);
976 							removed++;
977 						}
978 					}
979 				}
980 			}
981 
982 			if(entropyCutoff>0){
983 				int bad=flagLowEntropyReads(reads, entropyCutoff, entropyLength, entropyFraction);
984 				if(bad>0){
985 					lowEntropyZMWsT++;
986 					lowEntropyReadsT+=bad;
987 					if(bad>=reads.size()){
988 						if(!reads.isEmpty()){outputReads(reads);}
989 						return; //No point in continuing...
990 					}
991 				}
992 			}
993 
994 			if(reads.isEmpty()){return;}
995 
996 			Read sample=null;//Read that will be searched for inverted repeat, typically median length
997 			Read shortest=null;//shortest read in the middle, or overall if no middle reads
998 			final int medianLength=medianLength(reads);
999 			int longReads=0;
1000 			int sampleNum=0;
1001 
1002 			if(reads.size()>=3){
1003 				if(flagLongReads){longReads=flagLongReads(reads, medianLength);}
1004 				for(int i=1; i<reads.size()-1; i++){
1005 					Read r=reads.get(i);
1006 					if(sample==null && r.length()==medianLength){
1007 						sample=r;
1008 						sampleNum=i;
1009 					}
1010 					if(shortest==null || r.length()<shortest.length()){shortest=r;}
1011 				}
1012 			}else{
1013 				for(int i=0; i<reads.size(); i++){
1014 					Read r=reads.get(i);
1015 					if(sample==null || sample.length()<r.length()){
1016 						sample=r;
1017 						sampleNum=i;
1018 					}
1019 					if(shortest==null || r.length()<shortest.length()){shortest=r;}
1020 				}
1021 			}
1022 			assert(sample!=null);
1023 
1024 			if(keepShortReads){
1025 
1026 			}else if(sample.discarded() || (longReads>0)){
1027 				for(Read r : reads){
1028 					r.setDiscarded(true);
1029 				}
1030 			}
1031 
1032 			if(minLengthAfterTrimming>0){removeShortTrash(reads);}
1033 
1034 			if(!reads.isEmpty()){outputReads(reads);}
1035 		}
1036 
removeShortTrash(ZMW reads)1037 		private int removeShortTrash(ZMW reads) {
1038 			int removed=0;
1039 			for(int i=0; i<reads.size(); i++){
1040 				Read r=reads.get(i);
1041 				if(r.length()<minLengthAfterTrimming){//Discard short trash
1042 					if(!r.discarded()){
1043 						removed++;
1044 						r.setDiscarded(true);
1045 					}
1046 				}
1047 			}
1048 			return removed;
1049 		}
1050 
fixReadHeader(Read r, int leftTrim, int rightTrim)1051 		private void fixReadHeader(Read r, int leftTrim, int rightTrim){
1052 			leftTrim=Tools.max(0, leftTrim);
1053 			rightTrim=Tools.max(0, rightTrim);
1054 			if(leftTrim<1 && rightTrim<1){return;}
1055 			final int idx=r.id.lastIndexOf('/');
1056 			if(idx>0 && idx<r.id.length()-3){
1057 				String prefix=r.id.substring(0, idx+1);
1058 				String suffix=r.id.substring(idx+1);
1059 				if(suffix.indexOf('_')>0){
1060 					String coords=suffix, comment="";
1061 					int tab=suffix.indexOf('\t');
1062 					if(tab<0){tab=suffix.indexOf(' ');}
1063 					if(tab>0){
1064 						coords=coords.substring(0, tab);
1065 						comment=coords.substring(tab);
1066 					}
1067 					String[] split=Tools.underscorePattern.split(coords);
1068 					int left=Integer.parseInt(split[0]);
1069 					int right=Integer.parseInt(split[1]);
1070 					left+=leftTrim;
1071 					right-=rightTrim;
1072 					if(left>right){left=right;}
1073 
1074 					if(right-left!=r.length()){right=left+r.length();}
1075 //					System.err.println(r.length()+", "+(right-left));
1076 
1077 					r.id=prefix+left+"_"+right+comment;
1078 					final SamLine sl=r.samline;
1079 					if(sl!=null){
1080 						sl.qname=r.id;
1081 						if(sl.optional!=null){
1082 							for(int i=0; i<sl.optional.size(); i++){
1083 								String s=sl.optional.get(i);
1084 								if(s.startsWith("qe:i:")){
1085 									s="qe:i:"+right;
1086 									sl.optional.set(i, s);
1087 								}else if(s.startsWith("qs:i:")){
1088 									s="qs:i:"+left;
1089 									sl.optional.set(i, s);
1090 								}
1091 							}
1092 						}
1093 					}
1094 				}
1095 			}
1096 		}
1097 
trimRead(Read r, int lookahead)1098 		int trimRead(Read r, int lookahead){
1099 			final byte[] bases=r.bases;
1100 
1101 			int left=calcLeftTrim(bases, lookahead);
1102 			int right=calcRightTrim(bases, lookahead);
1103 			int trimmed=0;
1104 
1105 			if(left+right>0){
1106 //				System.err.println(r.length()+", "+left+", "+right+", "+r.id);
1107 				trimmed=TrimRead.trimByAmount(r, left, right, 1, false);
1108 //				System.err.println(r.length()+", "+left+", "+right+", "+r.id);
1109 				fixReadHeader(r, left, right);
1110 //				System.err.println(r.length()+", "+left+", "+right+", "+r.id);
1111 			}
1112 
1113 			if(r.samline!=null){
1114 				r.samline.seq=r.bases;
1115 				r.samline.qual=r.quality;
1116 			}
1117 			return trimmed;
1118 		}
1119 
trimPolyA(Read r)1120 		int trimPolyA(Read r){
1121 			final byte[] bases=r.bases;
1122 
1123 			int left=Tools.max(PolymerTrimmer.testLeft(bases, 'A'), PolymerTrimmer.testLeft(bases, 'T'));
1124 			int right=Tools.max(PolymerTrimmer.testRight(bases, 'A'), PolymerTrimmer.testRight(bases, 'T'));
1125 			int trimmed=0;
1126 
1127 			if(left+right>0){
1128 //				System.err.println(r.length()+", "+left+", "+right+", "+r.id);
1129 				trimmed=TrimRead.trimByAmount(r, left, right, 1, false);
1130 //				System.err.println(r.length()+", "+left+", "+right+", "+r.id);
1131 				fixReadHeader(r, left, right);
1132 //				System.err.println(r.length()+", "+left+", "+right+", "+r.id);
1133 			}
1134 
1135 			if(r.samline!=null){
1136 				r.samline.seq=r.bases;
1137 				r.samline.qual=r.quality;
1138 			}
1139 			return trimmed;
1140 		}
1141 
calcLeftTrim(final byte[] bases, int lookahead)1142 		final int calcLeftTrim(final byte[] bases, int lookahead){
1143 			final int len=bases.length;
1144 			int lastUndef=-1;
1145 			for(int i=0, defined=0; i<len && defined<lookahead; i++){
1146 				if(AminoAcid.isFullyDefined(bases[i])){
1147 					defined++;
1148 				}else{
1149 					lastUndef=i;
1150 					defined=0;
1151 				}
1152 			}
1153 			return lastUndef+1;
1154 		}
1155 
calcRightTrim(final byte[] bases, int lookahead)1156 		final int calcRightTrim(final byte[] bases, int lookahead){
1157 			final int len=bases.length;
1158 			int lastUndef=len;
1159 			for(int i=len-1, defined=0; i>=0 && defined<lookahead; i--){
1160 				if(AminoAcid.isFullyDefined(bases[i])){
1161 					defined++;
1162 				}else{
1163 					lastUndef=i;
1164 					defined=0;
1165 				}
1166 			}
1167 			return len-lastUndef-1;
1168 		}
1169 
outputReads(ZMW reads)1170 		private void outputReads(ZMW reads){
1171 			final int size=reads.size();
1172 			final ArrayList<Read> good=new ArrayList<Read>(size);
1173 			final ArrayList<Read> bad=new ArrayList<Read>(size);
1174 //			final ArrayList<Read> bad=(rosb==null ? null : new ArrayList<Read>(size));
1175 
1176 			int discardedReads=0;
1177 			int trimmedReads=0;
1178 			boolean sendAllToDiscarded=false;
1179 
1180 			//Check to see if any reads are discarded or ambiguous
1181 			for(Read r : reads){
1182 				if(r.discarded()){
1183 					discardedReads++;
1184 				}else if(r.trimmed()){
1185 					trimmedReads++;
1186 				}
1187 			}
1188 
1189 			//Unify flags on all reads
1190 			if(keepZMWsTogether){
1191 				if(discardedReads>0){sendAllToDiscarded=true;}
1192 			}
1193 			if(discardedReads>0){
1194 				if(discardedReads==size){
1195 					fullyDiscardedZMWsT++;
1196 				}else{
1197 					partiallyDiscardedZMWsT++;
1198 				}
1199 			}
1200 
1201 			for(Read r : reads){
1202 				if(r.discarded() || sendAllToDiscarded){
1203 //					assert(false);
1204 					if(bad!=null){bad.add(r);}
1205 //					System.err.println("d:t="+r.tested()+",ad="+r.hasAdapter()+",ir="+r.invertedRepeat()+","+r.id+", reads="+reads.size());
1206 				}else{
1207 					if(good!=null){
1208 						good.add(r);
1209 					}
1210 					readsOutT++;
1211 					basesOutT+=r.length();
1212 				}
1213 			}
1214 
1215 			if(good!=null && !good.isEmpty()){ZMWsOutT++;}
1216 			if(rosg!=null && good!=null && !good.isEmpty()){rosg.add(good, 0);}
1217 			if(rosb!=null && bad!=null && !bad.isEmpty()){rosb.add(bad, 0);}
1218 		}
1219 
1220 		/*--------------------------------------------------------------*/
1221 		/*----------------      Inner Class Fields      ----------------*/
1222 		/*--------------------------------------------------------------*/
1223 
1224 		/** Number of reads processed by this thread */
1225 		protected long readsProcessedT=0;
1226 		/** Number of bases processed by this thread */
1227 		protected long basesProcessedT=0;
1228 		/** Number of ZMWs processed by this thread */
1229 		protected long ZMWsT=0;
1230 
1231 		protected long basesTrimmedT=0;
1232 		protected long readsTrimmedT=0;
1233 
1234 		/** Number of reads retained by this thread */
1235 		protected long readsOutT=0;
1236 		/** Number of bases retained by this thread */
1237 		protected long basesOutT=0;
1238 		/** Number of ZMWs retained by this thread */
1239 		protected long ZMWsOutT=0;
1240 
1241 		protected long partiallyDiscardedZMWsT=0;
1242 		protected long fullyDiscardedZMWsT=0;
1243 
1244 		protected long lowEntropyZMWsT=0;
1245 		protected long lowEntropyReadsT=0;
1246 
1247 		/** True only if this thread has completed successfully */
1248 		boolean success=false;
1249 
1250 		/** Shared output stream */
1251 		private final ConcurrentReadOutputStream rosg;
1252 		/** Shared output stream for bad reads */
1253 		private final ConcurrentReadOutputStream rosb;
1254 		/** Thread ID */
1255 		final int tid;
1256 
1257 		final ZMWStreamer zstream;
1258 		final EntropyTracker eTracker;
1259 		Random randy;
1260 	}
1261 
1262 	/*--------------------------------------------------------------*/
1263 	/*----------------            Fields            ----------------*/
1264 	/*--------------------------------------------------------------*/
1265 
1266 	/** Primary input file path */
1267 	private String in1=null;
1268 
1269 	/** Primary output file path */
1270 	private String outg=null;
1271 	/** Bad output file path */
1272 	private String outb=null;
1273 
1274 	/** Stats (screen) output file path */
1275 	private String outstats=null;
1276 
1277 	/** Subread count histogram */
1278 	private String schist=null;
1279 
1280 	/** Override input file extension */
1281 	private String extin=null;
1282 	/** Override output file extension */
1283 	private String extout=null;
1284 
1285 	private float longReadMult=1.5f;
1286 
1287 	/** For grading synthetic data */
1288 	private boolean parseCustom;
1289 
1290 	/** Input reads are CCS (full pass) */
1291 	private boolean CCSInput;
1292 
1293 	//Note: These flags are very similar... they need to be better-defined or merged.
1294 	private boolean keepZMWsTogether=false;
1295 	private boolean keepShortReads=true;
1296 
1297 	private boolean subsampleFromEnds=false;
1298 
1299 	private int format=FORMAT_TEXT;
1300 	private static final int FORMAT_TEXT=1, FORMAT_JSON=2;
1301 
1302 	/*--------------------------------------------------------------*/
1303 
1304 	/** Number of reads processed */
1305 	protected long readsProcessed=0;
1306 	/** Number of bases processed */
1307 	protected long basesProcessed=0;
1308 
1309 	/** Number of reads retained */
1310 	protected long readsOut=0;
1311 	/** Number of bases retained */
1312 	protected long basesOut=0;
1313 	/** Number of ZMWs retained */
1314 	protected long ZMWsOut=0;
1315 
1316 	protected long partiallyDiscardedZMWs=0;
1317 	protected long fullyDiscardedZMWs=0;
1318 
1319 	/** Quit after processing this many input reads; -1 means no limit */
1320 	private long maxReads=-1;
1321 
1322 	protected long basesTrimmed=0;
1323 	protected long readsTrimmed=0;
1324 
1325 	protected long lowEntropyZMWs=0;
1326 	protected long lowEntropyReads=0;
1327 
1328 	/** Total ZMWs observed */
1329 	protected long ZMWs=0;
1330 
1331 	/** Histogram */
1332 	protected final long[] subreadCounts=new long[101];
1333 
1334 	private boolean flagLongReads=false;
1335 	private boolean trimReads=false;
1336 	private int minLengthAfterTrimming=0;
1337 
1338 	boolean trimPolyA=false;
1339 
1340 	/*--------------------------------------------------------------*/
1341 	/*----------------      Subsampling Fields      ----------------*/
1342 	/*--------------------------------------------------------------*/
1343 
1344 	//If a target is chosen, these will be initialized with the original counts
1345 	long initialReads=0;
1346 	long initialZMWs=0;
1347 	long initialBases=0;
1348 
1349 	long readsRemaining=0;
1350 	long ZMWsRemaining=0;
1351 	long basesRemaining=0;
1352 
1353 	float samplerate=1.0f;
1354 	long sampleReadsTarget=-1;
1355 	long sampleBasesTarget=-1;
1356 	long sampleZMWsTarget=-1;
1357 
1358 	final boolean sampleReadsExact;
1359 	final boolean sampleBasesExact;
1360 	final boolean sampleZMWsExact;
1361 	final boolean sampleExact;
1362 
1363 	boolean keepBestPass=false;
1364 	boolean keepLongestPass=false;
1365 
1366 	/*--------------------------------------------------------------*/
1367 	/*----------------        Subset Fields         ----------------*/
1368 	/*--------------------------------------------------------------*/
1369 
1370 	String whitelist;
1371 	String blacklist;
1372 
1373 	IntHashSet whiteSet;
1374 	IntHashSet blackSet;
1375 
1376 	/*--------------------------------------------------------------*/
1377 	/*----------------        Entropy Fields        ----------------*/
1378 	/*--------------------------------------------------------------*/
1379 
1380 	/** Minimum entropy to be considered "complex", on a scale of 0-1 */
1381 	float entropyCutoff=-1; //suggested 0.55
1382 	/** Minimum length of a low-entropy block to fail a read */
1383 	int entropyLength=350;
1384 	/** Minimum length of a low-entropy block as a fraction of read length;
1385 	 * the minimum of the two will be used */
1386 	float entropyFraction=0.5f;
1387 
1388 	float maxMonomerFraction=0.74f; //Suggested...  0.74
1389 
1390 	/*--------------------------------------------------------------*/
1391 	/*----------------         Final Fields         ----------------*/
1392 	/*--------------------------------------------------------------*/
1393 
1394 	/** Primary input file */
1395 	private final FileFormat ffin1;
1396 
1397 	/** Primary output file */
1398 	private final FileFormat ffoutg;
1399 
1400 	/** Bad output file */
1401 	private final FileFormat ffoutb;
1402 
1403 	/** Stats output file */
1404 	private final FileFormat ffstats;
1405 
1406 	/** Subread count histogram */
1407 	private final FileFormat ffschist;
1408 
1409 	private final int threads;
1410 
1411 	private long seed=-1;
1412 	private long maxZMWs=-1;
1413 
1414 	private int shredLength=500;
1415 	private int overlap=10;//Helps make the shreds concur at their borders.
1416 	private float minShredIdentity=0.6f;
1417 	private boolean makeCCS=false;
1418 	private boolean findOrientation=false;
1419 
1420 	private float minPasses=0;
1421 	private int minSubreads=0;
1422 
1423 	/*--------------------------------------------------------------*/
1424 	/*----------------        Common Fields         ----------------*/
1425 	/*--------------------------------------------------------------*/
1426 
1427 	/** Print status messages to this output stream */
1428 	private PrintStream outstream=System.err;
1429 	/** Print verbose messages */
1430 	public static boolean verbose=false;
1431 	/** True if an error was encountered */
1432 	public boolean errorState=false;
1433 	/** Overwrite existing output files */
1434 	private boolean overwrite=false;
1435 	/** Append to existing output files */
1436 	private boolean append=false;
1437 
1438 }
1439