1 package consensus;
2 
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 import java.util.LinkedHashMap;
6 import java.util.Map.Entry;
7 
8 import fileIO.ByteFile;
9 import fileIO.ByteStreamWriter;
10 import fileIO.FileFormat;
11 import fileIO.ReadWrite;
12 import prok.ProkObject;
13 import shared.Parse;
14 import shared.Parser;
15 import shared.PreParser;
16 import shared.ReadStats;
17 import shared.Shared;
18 import shared.Timer;
19 import shared.Tools;
20 import sketch.SketchObject;
21 import stream.ConcurrentReadInputStream;
22 import stream.ConcurrentReadOutputStream;
23 import stream.FASTQ;
24 import stream.FastaReadInputStream;
25 import stream.Read;
26 import stream.SamLine;
27 import stream.SamReadStreamer;
28 import stream.SamStreamer;
29 import structures.ByteBuilder;
30 import structures.ListNum;
31 import template.Accumulator;
32 import template.ThreadWaiter;
33 import var2.Realigner;
34 import var2.SamFilter;
35 
36 /**
37  * Alters a reference to represent the consensus of aligned reads.
38  *
39  * @author Brian Bushnell
40  * @date September 6, 1019
41  *
42  */
43 public class ConsensusMaker extends ConsensusObject implements Accumulator<ConsensusMaker.ProcessThread> {
44 
45 	/*--------------------------------------------------------------*/
46 	/*----------------        Initialization        ----------------*/
47 	/*--------------------------------------------------------------*/
48 
49 	/**
50 	 * Code entrance from the command line.
51 	 * @param args Command line arguments
52 	 */
main(String[] args)53 	public static void main(String[] args){
54 		//Start a timer immediately upon code entrance.
55 		Timer t=new Timer();
56 
57 		//Create an instance of this class
58 		ConsensusMaker x=new ConsensusMaker(args);
59 
60 		//Run the object
61 		x.process(t);
62 
63 		//Close the print stream if it was redirected
64 		Shared.closeStream(x.outstream);
65 	}
66 
67 	/**
68 	 * Constructor.
69 	 * @param args Command line arguments
70 	 */
ConsensusMaker(String[] args)71 	public ConsensusMaker(String[] args){
72 
73 		{//Preparse block for help, config files, and outstream
74 			PreParser pp=new PreParser(args, getClass(), false);
75 			args=pp.args;
76 			outstream=pp.outstream;
77 		}
78 
79 		//Set shared static variables prior to parsing
80 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
81 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
82 		FASTQ.TEST_INTERLEAVED=FASTQ.FORCE_INTERLEAVED=false;
83 
84 		samFilter.includeUnmapped=false;
85 //		samFilter.includeSupplimentary=false;
86 //		samFilter.includeDuplicate=false;
87 		samFilter.includeNonPrimary=false;
88 		samFilter.includeQfail=false;
89 //		samFilter.minMapq=4;
90 
91 		{//Parse the arguments
92 			final Parser parser=parse(args);
93 
94 			Parser.processQuality();
95 
96 			maxReads=parser.maxReads;
97 			overwrite=ReadStats.overwrite=parser.overwrite;
98 			append=ReadStats.append=parser.append;
99 
100 			in=parser.in1;
101 			extin=parser.extin;
102 
103 			out=parser.out1;
104 			extout=parser.extout;
105 			silent=Parser.silent;
106 		}
107 
108 		{
109 //			if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);}
110 //			else{Scaffold.setCA3A(Parse.parseBoolean(atomic));}
111 
112 			if(ploidy<1){System.err.println("WARNING: ploidy not set; assuming ploidy=1."); ploidy=1;}
113 			samFilter.setSamtoolsFilter();
114 
115 			streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads()));
116 			assert(streamerThreads>0) : streamerThreads;
117 		}
118 
119 		validateParams();
120 		fixExtensions(); //Add or remove .gz or .bz2 as needed
121 		checkFileExistence(); //Ensure files can be read and written
122 		checkStatics(); //Adjust file-related static fields as needed for this program
123 
124 		//Create output FileFormat objects
125 		ffout=FileFormat.testOutput(out, FileFormat.FASTA, extout, true, overwrite, append, ordered);
126 		ffmodel=FileFormat.testOutput(outModel, FileFormat.ALM, null, true, overwrite, false, ordered);
127 
128 		//Create input FileFormat objects
129 		ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true);
130 		ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true);
131 
132 		if(inModelFile!=null){
133 			ArrayList<BaseGraph> list=(ArrayList<BaseGraph>)ReadWrite.readObject(inModelFile, false);
134 			inModel=list.get(0);
135 			inModel.calcProbs();
136 			inModel.makeWeights();
137 		}
138 	}
139 
140 	/*--------------------------------------------------------------*/
141 	/*----------------    Initialization Helpers    ----------------*/
142 	/*--------------------------------------------------------------*/
143 
144 	/** Parse arguments from the command line */
parse(String[] args)145 	private Parser parse(String[] args){
146 
147 		//Create a parser object
148 		Parser parser=new Parser();
149 
150 		//Set any necessary Parser defaults here
151 		//parser.foo=bar;
152 
153 		//Parse each argument
154 		for(int i=0; i<args.length; i++){
155 			String arg=args[i];
156 
157 			//Break arguments into their constituent parts, in the form of "a=b"
158 			String[] split=arg.split("=");
159 			String a=split[0].toLowerCase();
160 			String b=split.length>1 ? split[1] : null;
161 			if(b!=null && b.equalsIgnoreCase("null")){b=null;}
162 
163 			if(a.equals("verbose")){
164 				verbose=Parse.parseBoolean(b);
165 				BaseGraph.verbose=verbose;
166 			}else if(a.equals("ref")){
167 				ref=b;
168 			}else if(a.equals("outm") || a.equals("outmodel") || a.equals("model") || a.equals("alm")){
169 				outModel=b;
170 			}else if(a.equals("inm") || a.equals("inmodel")){
171 				inModelFile=b;
172 			}else if(a.equals("hist") || a.equals("histogram")){
173 				outHistFile=b;
174 			}else if(a.equals("realign")){
175 				realign=Parse.parseBoolean(b);
176 			}else if(a.equals("printscores")){
177 				printScores=Parse.parseBoolean(b);
178 			}else if(a.equalsIgnoreCase("useMapq")){
179 				useMapq=Parse.parseBoolean(b);
180 			}else if(a.equalsIgnoreCase("identityCeiling") || a.equalsIgnoreCase("idceiling") || a.equalsIgnoreCase("ceiling")){
181 				double d=Double.parseDouble(b);
182 				if(d<=2){d=d*100;}
183 				identityCeiling=(int)d;
184 				invertIdentity=true;
185 			}else if(a.equalsIgnoreCase("invertIdentity")){
186 				invertIdentity=Parse.parseBoolean(b);
187 			}else if(a.equals("name") || a.equals("rename") || a.equals("header")){
188 				name=b;
189 			}else if(a.equals("noindels")){
190 				noIndels=Parse.parseBoolean(b);
191 			}else if(a.equalsIgnoreCase("onlyConvertNs") || a.equalsIgnoreCase("nOnly") || a.equalsIgnoreCase("onlyN")){
192 				onlyConvertNs=Parse.parseBoolean(b);
193 			}else if(a.equals("ploidy")){
194 				ploidy=Integer.parseInt(b);
195 			}else if(a.equals("mindepth")){
196 				minDepth=Integer.parseInt(b);
197 			}else if(a.equals("trimdepth") || a.equals("trimdepthfraction")){
198 				trimDepthFraction=Float.parseFloat(b);
199 			}else if(a.equalsIgnoreCase("trimNs")){
200 				trimNs=Parse.parseBoolean(b);
201 			}else if(a.equalsIgnoreCase("mafn") || a.equals("mafnoref")){
202 				MAF_noref=Float.parseFloat(b);
203 			}else if(a.equalsIgnoreCase("mafsub")){
204 				MAF_sub=Float.parseFloat(b);
205 			}else if(a.equalsIgnoreCase("mafdel")){
206 				MAF_del=Float.parseFloat(b);
207 			}else if(a.equalsIgnoreCase("mafins")){
208 				MAF_ins=Float.parseFloat(b);
209 			}else if(a.equalsIgnoreCase("maf")){
210 				MAF_ins=MAF_noref=MAF_del=MAF_sub=Float.parseFloat(b);
211 			}else if(a.equalsIgnoreCase("mafindel")){
212 				MAF_ins=MAF_del=Float.parseFloat(b);
213 			}else if(a.equals("clearfilters")){
214 				if(Parse.parseBoolean(b)){
215 					samFilter.clear();
216 				}
217 			}else if(a.equals("parse_flag_goes_here")){
218 				long fake_variable=Parse.parseKMG(b);
219 				//Set a variable here
220 			}else if(samFilter.parse(arg, a, b)){
221 				//do nothing
222 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
223 				//do nothing
224 			}else{
225 				outstream.println("Unknown parameter "+args[i]);
226 				assert(false) : "Unknown parameter "+args[i];
227 			}
228 		}
229 
230 		if(ref!=null && !Tools.existsInput(ref)){
231 			specialRef=ProkObject.isSpecialType(ref);
232 		}
233 
234 		return parser;
235 	}
236 
237 	/** Add or remove .gz or .bz2 as needed */
fixExtensions()238 	private void fixExtensions(){
239 		in=Tools.fixExtension(in);
240 		if(!specialRef){ref=Tools.fixExtension(ref);}
241 	}
242 
243 	/** Ensure files can be read and written */
checkFileExistence()244 	private void checkFileExistence(){
245 
246 		//Ensure there is an input file
247 		if(in==null){throw new RuntimeException("Error - an input file is required.");}
248 
249 		//Ensure there is an input file
250 		if(ref==null){throw new RuntimeException("Error - a reference file is required.");}
251 
252 		//Ensure output files can be written
253 		if(!Tools.testOutputFiles(overwrite, append, false, out, outModel, outHistFile)){
254 			outstream.println((out==null)+", "+out);
255 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+" or "+outModel+"\n");
256 		}
257 
258 		//Ensure input files can be read
259 		if(!Tools.testInputFiles(true, true, in, inModelFile, (specialRef ? null : ref))){
260 			throw new RuntimeException("\nCan't read some input files.\n");
261 		}
262 
263 		//Ensure that no file was specified multiple times
264 		if(!Tools.testForDuplicateFiles(true, in, ref, out, outModel, outHistFile, inModelFile)){
265 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
266 		}
267 	}
268 
269 	/** Adjust file-related static fields as needed for this program */
checkStatics()270 	private static void checkStatics(){
271 		//Adjust the number of threads for input file reading
272 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
273 			ByteFile.FORCE_MODE_BF2=true;
274 		}
275 
276 		assert(FastaReadInputStream.settingsOK());
277 	}
278 
279 	/** Ensure parameter ranges are within bounds and required parameters are set */
validateParams()280 	private boolean validateParams(){
281 //		assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
282 		return true;
283 	}
284 
writeHist(String fname)285 	private void writeHist(String fname){
286 		ByteStreamWriter bsw=new ByteStreamWriter(fname, overwrite, append, false);
287 		bsw.start();
288 		bsw.print("#Value\tIdentity");
289 		if(inModel!=null){
290 			bsw.print("\tScore");
291 		}
292 		bsw.nl();
293 		for(int i=0; i<idHist.length; i++){
294 			bsw.print(0.01f*i, 2).tab().print(idHist[i]);
295 			if(inModel!=null){
296 				bsw.tab().print(scoreHist[i]);
297 			}
298 			bsw.nl();
299 		}
300 		errorState=bsw.poisonAndWait()|errorState;
301 	}
302 
303 	/*--------------------------------------------------------------*/
304 	/*----------------         Outer Methods        ----------------*/
305 	/*--------------------------------------------------------------*/
306 
307 	/** Create read streams and process all data */
process(Timer t)308 	void process(Timer t){
309 
310 		//Turn off read validation in the input threads to increase speed
311 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
312 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
313 
314 		//Load reference;
315 		refMap=loadReferenceCustom();
316 		makeRefMap2();
317 
318 		//Reset counters
319 		readsProcessed=readsOut=0;
320 		basesProcessed=basesOut=0;
321 		alignedReads=0;
322 
323 		if(ffin.samOrBam()){
324 			//Create a read input stream
325 			final SamStreamer ss=makeStreamer(ffin);
326 			//Process the reads in separate threads
327 			spawnThreads(ss);
328 		}else{
329 			Shared.capBufferLen(40);
330 			//Create a read input stream
331 			final ConcurrentReadInputStream cris=makeCris(ffin);
332 			//Process the reads in separate threads
333 			spawnThreads(cris);
334 		}
335 
336 		//Optionally create a read output stream
337 		final ConcurrentReadOutputStream ros=makeCros();
338 
339 		outputConsensus(ros);
340 
341 		if(outHistFile!=null){
342 			writeHist(outHistFile);
343 		}
344 
345 		if(verbose){outstream.println("Finished; closing streams.");}
346 
347 		//Write anything that was accumulated by ReadStats
348 		errorState|=ReadStats.writeAll();
349 		//Close the read streams
350 		errorState|=ReadWrite.closeStream(ros);
351 
352 		//Reset read validation
353 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
354 
355 		//Report timing and results
356 		t.stop();
357 		if(!silent){
358 			outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
359 			outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
360 			outstream.println();
361 		}
362 		outstream.println(Tools.number("Ref Count:", refCount, 8));
363 		outstream.println(Tools.number("Sub Count:", subCount, 8));
364 		outstream.println(Tools.number("Del Count:", delCount, 8));
365 		outstream.println(Tools.number("Ins Count:", insCount, 8));
366 		outstream.println(Tools.number("Avg Identity:", 100*identitySum/alignedReads, 3, 8));
367 		if(scoreSum>0){
368 			outstream.println(Tools.number("Avg Score:", 100*scoreSum/alignedReads, 3, 8));
369 		}
370 
371 		//Throw an exception of there was an error in a thread
372 		if(errorState){
373 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
374 		}
375 	}
376 
377 	private synchronized LinkedHashMap<String, BaseGraph> loadReferenceCustom(){
378 		assert(!loadedRef);
379 		if(specialRef){return loadReferenceSpecial();}
380 		ConcurrentReadInputStream cris=makeRefCris();
381 		LinkedHashMap<String, BaseGraph> map=new LinkedHashMap<String, BaseGraph>();
382 		ListNum<Read> ln=cris.nextList();
383 		for(; ln!=null && !ln.isEmpty(); ln=cris.nextList()){
384 			if(verbose){outstream.println("Fetched "+ln.size()+" reference sequences.");}
385 			for(Read r : ln){
386 				if(r.bases!=null && r.bases.length>0){
387 					BaseGraph bg=new BaseGraph(r.id, r.bases, r.quality, r.numericID, 0);
388 					map.put(r.name(), bg);
389 //					if(inModel!=null && inModel.name.equals(bg.name)){
390 ////						assert(false) : Arrays.toString(bg.refWeights)+"\n"+Arrays.toString(inModel.refWeights);
391 //						bg.refWeights=inModel.refWeights;
392 //						bg.insWeights=inModel.insWeights;
393 //						bg.delWeights=inModel.delWeights;
394 //					}else{
395 //						bg.insWeights=new float[bg.ref.length];
396 //						bg.delWeights=new float[bg.ref.length];
397 //						Arrays.fill(bg.insWeights, 1);
398 //						Arrays.fill(bg.delWeights, 1);
399 ////						assert(false) : Arrays.toString(bg.delWeights);
400 //					}
401 				}
402 			}
403 			if(ln!=null){
404 				cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
405 			}
406 		}
407 		if(verbose){outstream.println("Loaded "+map.size()+" reference sequences.");}
408 		loadedRef=true;
409 		return map;
410 	}
411 
412 	//For ribo subunits in resources directory
413 	private synchronized LinkedHashMap<String, BaseGraph> loadReferenceSpecial(){
414 		assert(!loadedRef);
415 		Read[] array=ProkObject.loadConsensusSequenceType(ref, false, false);
416 		Read r=array[0];
417 		BaseGraph bg=new BaseGraph(r.id, r.bases, r.quality, r.numericID, 0);
418 		LinkedHashMap<String, BaseGraph> map=new LinkedHashMap<String, BaseGraph>();
419 		map.put(r.name(), bg);
420 		return map;
421 	}
422 
423 	private synchronized void makeRefMap2(){
424 		assert(refMap!=null && refMap2==null);
425 		if(verbose){outstream.println("Making refMap2.");}
426 		refMap2=new LinkedHashMap<String, BaseGraph>((refMap.size()*3)/2);
427 		for(Entry<String, BaseGraph> e : refMap.entrySet()){
428 			String key=e.getKey();
429 			if(verbose){outstream.println("Considering "+key);}
430 			BaseGraph value=e.getValue();
431 			String key2=Tools.trimToWhitespace(key);
432 			if(verbose){outstream.println("key2="+key2);}
433 //			if(verbose){outstream.println("put "+key2+", "+value);}
434 			refMap2.put(key2, value);
435 //			if(verbose){outstream.println("putted "+key2+", "+value);}
436 
437 			if(defaultRname==null){defaultRname=key;}
438 		}
439 //		assert(false) : refMap+"\n"+refMap2;
440 		if(verbose){outstream.println("Made refMap2.");}
441 	}
442 
443 	private ConcurrentReadInputStream makeRefCris(){
444 		ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null);
445 		cris.start(); //Start the stream
446 		if(verbose){outstream.println("Started cris");}
447 		boolean paired=cris.paired();
448 		assert(!paired) : "References should not be paired.";
449 		return cris;
450 	}
451 
452 	private SamStreamer makeStreamer(FileFormat ff){
453 		if(ff==null){return null;}
454 		SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
455 		ss.start(); //Start the stream
456 		if(verbose){outstream.println("Started Streamer");}
457 		return ss;
458 	}
459 
460 	private ConcurrentReadInputStream makeCris(FileFormat ff){
461 		ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ff, null);
462 		cris.start(); //Start the stream
463 		if(verbose){outstream.println("Started cris");}
464 		boolean paired=cris.paired();
465 		assert(!paired);
466 		return cris;
467 	}
468 
469 	private ConcurrentReadOutputStream makeCros(){
470 		if(ffout==null){return null;}
471 
472 		//Select output buffer size based on whether it needs to be ordered
473 		final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
474 
475 		final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false);
476 		ros.start(); //Start the stream
477 		return ros;
478 	}
479 
480 	/*--------------------------------------------------------------*/
481 	/*----------------       Thread Management      ----------------*/
482 	/*--------------------------------------------------------------*/
483 
484 	/** Spawn process threads */
485 	private void spawnThreads(final SamStreamer ss){
486 
487 		//Do anything necessary prior to processing
488 
489 		//Determine how many threads may be used
490 		final int threads=Shared.threads();
491 
492 		//Fill a list with ProcessThreads
493 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
494 		for(int i=0; i<threads; i++){
495 			alpt.add(new ProcessThread(ss, i));
496 		}
497 
498 		//Start the threads
499 		for(ProcessThread pt : alpt){
500 			pt.start();
501 		}
502 		if(verbose){outstream.println("Started worker threads.");}
503 
504 		//Wait for threads to finish
505 		boolean success=ThreadWaiter.waitForThreads(alpt, this);
506 		errorState&=!success;
507 
508 		//Do anything necessary after processing
509 
510 	}
511 
512 	/** Spawn process threads */
513 	private void spawnThreads(final ConcurrentReadInputStream cris){
514 
515 		//Do anything necessary prior to processing
516 
517 		//Determine how many threads may be used
518 		final int threads=Shared.threads();
519 
520 		//Fill a list with ProcessThreads
521 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
522 		for(int i=0; i<threads; i++){
523 			alpt.add(new ProcessThread(cris, i));
524 		}
525 
526 		//Start the threads
527 		for(ProcessThread pt : alpt){
528 			pt.start();
529 		}
530 		if(verbose){outstream.println("Started worker threads.");}
531 
532 		//Wait for threads to finish
533 		boolean success=ThreadWaiter.waitForThreads(alpt, this);
534 		errorState&=!success;
535 
536 		//Do anything necessary after processing
537 		ReadWrite.closeStreams(cris);
538 	}
539 
540 	private void outputConsensus(ConcurrentReadOutputStream ros){
541 		if(verbose){outstream.println("Making consensus.");}
542 		ArrayList<Read> consensusList=new ArrayList<Read>(200);
543 		ArrayList<BaseGraph> graphList=new ArrayList<BaseGraph>(200);
544 		long num=0;
545 		for(Entry<String, BaseGraph> e : refMap.entrySet()){
546 			BaseGraph bg=e.getValue();
547 			graphList.add(bg);
548 
549 			Read r=bg.traverse();
550 			if(name!=null){r.id=name;}
551 			consensusList.add(r);
552 
553 			refCount+=bg.refCount;
554 			subCount+=bg.subCount;
555 			delCount+=bg.delCount;
556 			insCount+=bg.insCount;
557 
558 			readsOut++;
559 			basesOut+=r.length();
560 
561 			if(consensusList.size()>=200){
562 				if(ros!=null){ros.add(consensusList, num);}
563 				consensusList=new ArrayList<Read>(200);
564 				num++;
565 			}
566 		}
567 		if(consensusList.size()>0){
568 			if(ros!=null){ros.add(consensusList, num);}
569 			consensusList=new ArrayList<Read>(200);
570 			num++;
571 		}
572 		if(graphList.size()>0 && ffmodel!=null){
573 //			ReadWrite.writeObjectInThread(graphList, ffmodel.name(), true);
574 			ReadWrite.writeAsync(graphList, ffmodel.name(), true);
575 		}
576 	}
577 
578 	@Override
579 	public final void accumulate(ProcessThread pt){
580 		readsProcessed+=pt.readsProcessedT;
581 		basesProcessed+=pt.basesProcessedT;
582 		alignedReads+=pt.alignedReadsT;
583 		identitySum+=pt.identitySumT;
584 		scoreSum+=pt.scoreSumT;
585 		for(int i=0; i<idHist.length; i++){
586 			idHist[i]+=pt.idHistT[i];
587 			scoreHist[i]+=pt.scoreHistT[i];
588 		}
589 		errorState|=(!pt.success);
590 	}
591 
592 	@Override
593 	public final boolean success(){return !errorState;}
594 
595 	/*--------------------------------------------------------------*/
596 	/*----------------         Inner Methods        ----------------*/
597 	/*--------------------------------------------------------------*/
598 
599 	@Override
600 	public ByteBuilder toText() {
601 		// TODO Auto-generated method stub
602 		return null;
603 	}
604 
605 	/*--------------------------------------------------------------*/
606 	/*----------------         Inner Classes        ----------------*/
607 	/*--------------------------------------------------------------*/
608 
609 	/** This class is static to prevent accidental writing to shared variables.
610 	 * It is safe to remove the static modifier. */
611 	class ProcessThread extends Thread {
612 
613 		//Constructor
614 		ProcessThread(final SamStreamer ss_, final int tid_){
615 			ss=ss_;
616 			cris=null;
617 			tid=tid_;
618 			realigner=(realign ? new Realigner() : null);
619 		}
620 
621 		//Constructor
622 		ProcessThread(final ConcurrentReadInputStream cris_, final int tid_){
623 			ss=null;
624 			cris=cris_;
625 			tid=tid_;
626 			realigner=null;//(realign ? new Realigner() : null);
627 		}
628 
629 		//Called by start()
630 		@Override
631 		public void run(){
632 			//Do anything necessary prior to processing
633 
634 			//Process the reads
635 			processInner();
636 
637 			//Do anything necessary after processing
638 
639 			//Indicate successful exit status
640 			success=true;
641 		}
642 
643 		/** Iterate through the reads */
644 		void processInner(){
645 
646 
647 			//Grab and process all lists
648 
649 			if(ss!=null){
650 				for(ListNum<Read> ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){
651 					processList(ln);
652 				}
653 			}else{
654 				for(ListNum<Read> ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()){
655 					processList(ln);
656 					cris.returnList(ln);
657 				}
658 			}
659 
660 		}
661 
662 		void processList(ListNum<Read> ln){
663 
664 			//Loop through each read in the list
665 			for(Read r : ln){
666 
667 				//Validate reads in worker threads
668 				if(!r.validated()){r.validate(true);}
669 
670 				//Track the initial length for statistics
671 				final int initialLength=r.length();
672 
673 				//Increment counters
674 				readsProcessedT++;
675 				basesProcessedT+=initialLength;
676 
677 				{
678 					//Reads are processed in this block.
679 					processRead(r);
680 				}
681 			}
682 		}
683 
684 		/**
685 		 * Process a read or a read pair.
686 		 * @param r Read 1
687 		 * @param r2 Read 2 (may be null)
688 		 * @return True if the reads should be kept, false if they should be discarded.
689 		 */
690 		void processRead(final Read r){
691 			if(r.bases==null || r.length()<=1){return;}
692 
693 			final SamLine sl=r.samline;
694 			if(sl!=null && !sl.mapped()){return;}
695 			final String rname=(sl==null ? defaultRname : sl.rnameS());
696 			BaseGraph bg=refMap.get(rname);
697 			if(bg==null){bg=refMap2.get(Tools.trimToWhitespace(rname));}
698 			assert(bg!=null) : "Can't find graph for "+rname;
699 			float identity;
700 			float score=0;//unused
701 
702 //			assert(false) : r;
703 			if(sl==null){
704 //				identity=SketchObject.alignAndMakeMatch(r, bg.original);
705 //				assert(false) : bg.refWeights[0];
706 //				identity=SketchObject.alignAndMakeMatch(r, bg.original, bg.refWeights, bg.insWeights, bg.delWeights);
707 				identity=SketchObject.alignAndMakeMatch(r, bg.original, bg.refWeights);
708 				if(identity<=0){return;}
709 				assert(r.match!=null) : identity+", "+r;
710 			}else{
711 				assert(sl!=null && sl.mapped() && sl.seq!=null) : sl;
712 				assert(r.match!=null);
713 				if(samFilter!=null && !samFilter.passesFilter(sl)){return;}
714 				identity=sl.calcIdentity();
715 			}
716 			assert(r.match!=null && identity>0) : identity+", "+r;
717 
718 			if(inModel!=null){
719 
720 				if(verbose){System.err.println(r.start+"\t"+r.stop+"\t"+new String(r.match));}
721 				score=inModel.score(r, false, true);
722 				if(printScores){
723 //					System.err.println(identity+"\t"+score+"\t"+inModel.score(r, false, true));
724 					System.err.println(String.format("%.5f\t%.5f", identity, score));
725 				}
726 //				assert(false);
727 //				if(score<0){
728 //					verbose=true;
729 //					inModel.score(r, false, false);
730 //				}
731 //				assert(!verbose);
732 			}
733 
734 			alignedReadsT++;
735 			identitySumT+=identity;
736 			scoreSumT+=score;
737 			idHistT[Tools.mid(0, Math.round(100*identity), 100)]++;
738 			scoreHistT[Tools.mid(0, Math.round(100*score), 100)]++;
739 
740 
741 
742 //			assert(sl.calcIdentity()<=0.78) : sl.calcIdentity()+", "+samFilter.maxId;
743 //			assert(false) : sl.cigar+", "+sl.calcIdentity();
744 
745 //			System.err.println(rname);
746 
747 			if(realigner!=null) {
748 				assert(false);
749 				realigner.realign(r, sl, bg.original, true);
750 			}
751 
752 			bg.add(r);
753 		}
754 
755 		long[] idHistT=new long[101];
756 		long[] scoreHistT=new long[101];
757 
758 		/** Number of reads processed by this thread */
759 		protected long readsProcessedT=0;
760 		/** Number of bases processed by this thread */
761 		protected long basesProcessedT=0;
762 
763 		protected long alignedReadsT=0;
764 
765 		double identitySumT=0;
766 		double scoreSumT=0;
767 
768 		/** True only if this thread has completed successfully */
769 		boolean success=false;
770 
771 		/** Shared input stream */
772 		private final SamStreamer ss;
773 		/** Alternate input stream */
774 		private final ConcurrentReadInputStream cris;
775 		/** Thread ID */
776 		final int tid;
777 		/** For realigning reads */
778 		final Realigner realigner;
779 	}
780 
781 	/*--------------------------------------------------------------*/
782 	/*----------------            Fields            ----------------*/
783 	/*--------------------------------------------------------------*/
784 
785 	/** Read input file path */
786 	private String in=null;
787 	/** Reference input file path */
788 	private String ref=null;
789 
790 	private String inModelFile;
791 	private BaseGraph inModel;
792 	private String outHistFile;
793 
794 	/** Consensus output file path */
795 	private String out=null;
796 	/** Model output file path */
797 	private String outModel=null;
798 
799 	/** Override input file extension */
800 	private String extin=null;
801 	/** Override output file extension */
802 	private String extout=null;
803 
804 	/*--------------------------------------------------------------*/
805 
806 	/** Number of reads processed */
807 	protected long readsProcessed=0;
808 	/** Number of bases processed */
809 	protected long basesProcessed=0;
810 
811 	protected long alignedReads=0;
812 
813 	protected double identitySum=0;
814 	protected double scoreSum=0;
815 
816 	/** Number of reads retained */
817 	protected long readsOut=0;
818 	/** Number of bases retained */
819 	protected long basesOut=0;
820 
821 	/** Quit after processing this many input reads; -1 means no limit */
822 	private long maxReads=-1;
823 
824 	public long subCount=0;
825 	public long refCount=0;
826 	public long delCount=0;
827 	public long insCount=0;
828 
829 	long[] idHist=new long[101];
830 	long[] scoreHist=new long[101];
831 	boolean printScores=false;
832 
833 	/*--------------------------------------------------------------*/
834 
835 	/** Threads dedicated to reading the sam file */
836 	private int streamerThreads=SamStreamer.DEFAULT_THREADS;
837 
838 	private String name=null;
839 
840 	private boolean loadedRef=false;
841 	private boolean specialRef=false;
842 
843 	private boolean realign=false;
844 
845 	private int ploidy=1;
846 
847 	private final boolean silent;
848 
849 	public final SamFilter samFilter=new SamFilter();
850 	/** Uses full ref names */
851 	public LinkedHashMap<String, BaseGraph> refMap;
852 	/** Uses truncated ref names */
853 	public LinkedHashMap<String, BaseGraph> refMap2;
854 
855 	private String defaultRname=null;
856 
857 	/*--------------------------------------------------------------*/
858 	/*----------------         Final Fields         ----------------*/
859 	/*--------------------------------------------------------------*/
860 
861 	/** Read input file */
862 	private final FileFormat ffin;
863 	/** Reference input file */
864 	private final FileFormat ffref;
865 
866 	/** Consensus output file */
867 	private final FileFormat ffout;
868 	/** Model output file */
869 	private final FileFormat ffmodel;
870 
871 	/*--------------------------------------------------------------*/
872 	/*----------------        Common Fields         ----------------*/
873 	/*--------------------------------------------------------------*/
874 
875 	/** Print status messages to this output stream */
876 	private PrintStream outstream=System.err;
877 	/** True if an error was encountered */
878 	public boolean errorState=false;
879 	/** Overwrite existing output files */
880 	private boolean overwrite=false;
881 	/** Append to existing output files */
882 	private boolean append=false;
883 	/** Reads ARE output in input order, even though this is false */
884 	private final boolean ordered=false;
885 
886 }
887