1 package template;
2 
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 
6 import fileIO.ByteFile;
7 import fileIO.FileFormat;
8 import fileIO.ReadWrite;
9 import shared.Parse;
10 import shared.Parser;
11 import shared.PreParser;
12 import shared.ReadStats;
13 import shared.Shared;
14 import shared.Timer;
15 import shared.Tools;
16 import stream.ConcurrentReadInputStream;
17 import stream.ConcurrentReadOutputStream;
18 import stream.FastaReadInputStream;
19 import stream.Read;
20 import stream.SamLine;
21 import stream.SamReadStreamer;
22 import stream.SamStreamer;
23 import structures.ListNum;
24 import var2.Realigner;
25 import var2.SamFilter;
26 import var2.ScafMap;
27 
28 /**
29  * This class does nothing.
30  * It is designed to be easily modified into a program
31  * that processes reads in multiple threads, by
32  * filling in the processRead method.
33  *
34  * @author Brian Bushnell
35  * @date September 6, 2019
36  *
37  */
38 public class A_SampleSamStreamer implements Accumulator<A_SampleSamStreamer.ProcessThread> {
39 
40 	/*--------------------------------------------------------------*/
41 	/*----------------        Initialization        ----------------*/
42 	/*--------------------------------------------------------------*/
43 
44 	/**
45 	 * Code entrance from the command line.
46 	 * @param args Command line arguments
47 	 */
main(String[] args)48 	public static void main(String[] args){
49 		//Start a timer immediately upon code entrance.
50 		Timer t=new Timer();
51 
52 		//Create an instance of this class
53 		A_SampleSamStreamer x=new A_SampleSamStreamer(args);
54 
55 		//Run the object
56 		x.process(t);
57 
58 		//Close the print stream if it was redirected
59 		Shared.closeStream(x.outstream);
60 	}
61 
62 	/**
63 	 * Constructor.
64 	 * @param args Command line arguments
65 	 */
A_SampleSamStreamer(String[] args)66 	public A_SampleSamStreamer(String[] args){
67 
68 		{//Preparse block for help, config files, and outstream
69 			PreParser pp=new PreParser(args, getClass(), false);
70 			args=pp.args;
71 			outstream=pp.outstream;
72 		}
73 
74 		//Set shared static variables prior to parsing
75 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
76 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
77 
78 //		samFilter.includeUnmapped=false;
79 //		samFilter.includeSupplimentary=false;
80 //		samFilter.includeDuplicate=false;
81 //		samFilter.includeNonPrimary=false;
82 //		samFilter.includeQfail=false;
83 //		samFilter.minMapq=4;
84 
85 		{//Parse the arguments
86 			final Parser parser=parse(args);
87 
88 			Parser.processQuality();
89 
90 			maxReads=parser.maxReads;
91 			overwrite=ReadStats.overwrite=parser.overwrite;
92 			append=ReadStats.append=parser.append;
93 
94 			in=parser.in1;
95 			extin=parser.extin;
96 
97 			out=parser.out1;
98 			extout=parser.extout;
99 		}
100 
101 		{
102 //			if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);}
103 //			else{Scaffold.setCA3A(Parse.parseBoolean(atomic));}
104 
105 			if(ploidy<1){System.err.println("WARNING: ploidy not set; assuming ploidy=1."); ploidy=1;}
106 			samFilter.setSamtoolsFilter();
107 
108 			streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads()));
109 			assert(streamerThreads>0) : streamerThreads;
110 		}
111 
112 		validateParams();
113 		fixExtensions(); //Add or remove .gz or .bz2 as needed
114 		checkFileExistence(); //Ensure files can be read and written
115 		checkStatics(); //Adjust file-related static fields as needed for this program
116 
117 		//Create output FileFormat objects
118 		ffout=FileFormat.testOutput(out, FileFormat.SAM, extout, true, overwrite, append, ordered);
119 		assert(false) : "TODO: Default output format might be fasta.";
120 
121 		//Create input FileFormat objects
122 		ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true);
123 		ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true);
124 	}
125 
126 	/*--------------------------------------------------------------*/
127 	/*----------------    Initialization Helpers    ----------------*/
128 	/*--------------------------------------------------------------*/
129 
130 	/** Parse arguments from the command line */
parse(String[] args)131 	private Parser parse(String[] args){
132 
133 		//Create a parser object
134 		Parser parser=new Parser();
135 
136 		//Set any necessary Parser defaults here
137 		//parser.foo=bar;
138 
139 		//Parse each argument
140 		for(int i=0; i<args.length; i++){
141 			String arg=args[i];
142 
143 			//Break arguments into their constituent parts, in the form of "a=b"
144 			String[] split=arg.split("=");
145 			String a=split[0].toLowerCase();
146 			String b=split.length>1 ? split[1] : null;
147 			if(b!=null && b.equalsIgnoreCase("null")){b=null;}
148 
149 			if(a.equals("verbose")){
150 				verbose=Parse.parseBoolean(b);
151 			}else if(a.equals("ref")){
152 				ref=b;
153 			}else if(a.equals("ordered")){
154 				ordered=Parse.parseBoolean(b);
155 			}else if(a.equals("realign")){
156 				realign=Parse.parseBoolean(b);
157 			}else if(a.equals("ploidy")){
158 				ploidy=Integer.parseInt(b);
159 			}else if(a.equals("clearfilters")){
160 				if(Parse.parseBoolean(b)){
161 					samFilter.clear();
162 				}
163 			}else if(a.equals("parse_flag_goes_here")){
164 				long fake_variable=Parse.parseKMG(b);
165 				//Set a variable here
166 			}else if(samFilter.parse(arg, a, b)){
167 				//do nothing
168 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
169 				//do nothing
170 			}else{
171 				outstream.println("Unknown parameter "+args[i]);
172 				assert(false) : "Unknown parameter "+args[i];
173 			}
174 		}
175 
176 		return parser;
177 	}
178 
179 	/** Add or remove .gz or .bz2 as needed */
fixExtensions()180 	private void fixExtensions(){
181 		in=Tools.fixExtension(in);
182 		ref=Tools.fixExtension(ref);
183 	}
184 
185 	/** Ensure files can be read and written */
checkFileExistence()186 	private void checkFileExistence(){
187 
188 		//Ensure there is an input file
189 		if(in==null){throw new RuntimeException("Error - an input file is required.");}
190 
191 		//Ensure there is an input file
192 		assert(false) : "TODO: Check.";
193 		if(ref==null){throw new RuntimeException("Error - a reference file is required.");}
194 
195 		//Ensure output files can be written
196 		if(!Tools.testOutputFiles(overwrite, append, false, out)){
197 			outstream.println((out==null)+", "+out);
198 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n");
199 		}
200 
201 		//Ensure input files can be read
202 		if(!Tools.testInputFiles(false, true, in, ref)){
203 			throw new RuntimeException("\nCan't read some input files.\n");
204 		}
205 
206 		//Ensure that no file was specified multiple times
207 		if(!Tools.testForDuplicateFiles(true, in, ref, out)){
208 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
209 		}
210 	}
211 
212 	/** Adjust file-related static fields as needed for this program */
checkStatics()213 	private static void checkStatics(){
214 		//Adjust the number of threads for input file reading
215 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
216 			ByteFile.FORCE_MODE_BF2=true;
217 		}
218 
219 		assert(FastaReadInputStream.settingsOK());
220 	}
221 
222 	/** Ensure parameter ranges are within bounds and required parameters are set */
validateParams()223 	private boolean validateParams(){
224 //		assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo;
225 		assert(false) : "TODO";
226 		return true;
227 	}
228 
229 	/*--------------------------------------------------------------*/
230 	/*----------------         Outer Methods        ----------------*/
231 	/*--------------------------------------------------------------*/
232 
233 	/** Create read streams and process all data */
process(Timer t)234 	void process(Timer t){
235 
236 		//Turn off read validation in the input threads to increase speed
237 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
238 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
239 
240 		//Create a read input stream
241 		final SamStreamer ss=makeStreamer(ffin);
242 
243 		//Load reference, if desired (and if present);
244 		loadScafMapFromReference();
245 //		loadReferenceCustom();
246 
247 		//Optionally create a read output stream
248 		final ConcurrentReadOutputStream ros=makeCros();
249 
250 		//Reset counters
251 		readsProcessed=readsOut=0;
252 		basesProcessed=basesOut=0;
253 
254 		//Process the reads in separate threads
255 		spawnThreads(ss, ros);
256 
257 		if(verbose){outstream.println("Finished; closing streams.");}
258 
259 		//Write anything that was accumulated by ReadStats
260 		errorState|=ReadStats.writeAll();
261 		//Close the read streams
262 		errorState|=ReadWrite.closeStream(ros);
263 
264 		//Reset read validation
265 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
266 
267 		//Report timing and results
268 		t.stop();
269 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
270 		outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
271 
272 		//Throw an exception of there was an error in a thread
273 		if(errorState){
274 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
275 		}
276 	}
277 
278 	private void loadScafMapFromReference(){
279 		if(loadedRef){return;}
280 		assert(ref!=null);
281 		scafMap=ScafMap.loadReference(ref, scafMap, samFilter, true);
282 		if(realign){Realigner.map=scafMap;}
283 		loadedRef=true;
284 	}
285 
286 	private void loadReferenceCustom(){
287 		ConcurrentReadInputStream cris=makeRefCris();
288 		for(ListNum<Read> ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()) {
289 			//Do something
290 			assert(false) : "TODO";
291 		}
292 	}
293 
294 	private ConcurrentReadInputStream makeRefCris(){
295 		ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null);
296 		cris.start(); //Start the stream
297 		if(verbose){outstream.println("Started cris");}
298 		boolean paired=cris.paired();
299 		assert(!paired) : "References should not be paired.";
300 		return cris;
301 	}
302 
303 	private SamStreamer makeStreamer(FileFormat ff){
304 		if(ff==null){return null;}
305 		SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads);
306 		ss.start(); //Start the stream
307 		if(verbose){outstream.println("Started Streamer");}
308 		return ss;
309 	}
310 
311 	private ConcurrentReadOutputStream makeCros(){
312 		if(ffout==null){return null;}
313 
314 		//Select output buffer size based on whether it needs to be ordered
315 		final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
316 
317 		final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false);
318 		ros.start(); //Start the stream
319 		return ros;
320 	}
321 
322 	/*--------------------------------------------------------------*/
323 	/*----------------       Thread Management      ----------------*/
324 	/*--------------------------------------------------------------*/
325 
326 	/** Spawn process threads */
327 	private void spawnThreads(final SamStreamer ss, final ConcurrentReadOutputStream ros){
328 
329 		//Do anything necessary prior to processing
330 
331 		//Determine how many threads may be used
332 		final int threads=Shared.threads();
333 
334 		//Fill a list with ProcessThreads
335 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
336 		for(int i=0; i<threads; i++){
337 			alpt.add(new ProcessThread(ss, ros, i));
338 		}
339 
340 		//Start the threads
341 		for(ProcessThread pt : alpt){
342 			pt.start();
343 		}
344 
345 		//Wait for threads to finish
346 		boolean success=ThreadWaiter.waitForThreads(alpt, this);
347 		errorState&=!success;
348 
349 		//Do anything necessary after processing
350 
351 	}
352 
353 	@Override
354 	public final void accumulate(ProcessThread pt){
355 		readsProcessed+=pt.readsProcessedT;
356 		basesProcessed+=pt.basesProcessedT;
357 		readsOut+=pt.readsOutT;
358 		basesOut+=pt.basesOutT;
359 		errorState|=(!pt.success);
360 	}
361 
362 	@Override
363 	public final boolean success(){return !errorState;}
364 
365 	/*--------------------------------------------------------------*/
366 	/*----------------         Inner Methods        ----------------*/
367 	/*--------------------------------------------------------------*/
368 
369 	/*--------------------------------------------------------------*/
370 	/*----------------         Inner Classes        ----------------*/
371 	/*--------------------------------------------------------------*/
372 
373 	/** This class is static to prevent accidental writing to shared variables.
374 	 * It is safe to remove the static modifier. */
375 	class ProcessThread extends Thread {
376 
377 		//Constructor
378 		ProcessThread(final SamStreamer ss_, final ConcurrentReadOutputStream ros_, final int tid_){
379 			ss=ss_;
380 			ros=ros_;
381 			tid=tid_;
382 			realigner=(realign ? new Realigner() : null);
383 		}
384 
385 		//Called by start()
386 		@Override
387 		public void run(){
388 			//Do anything necessary prior to processing
389 
390 			//Process the reads
391 			processInner();
392 
393 			//Do anything necessary after processing
394 
395 			//Indicate successful exit status
396 			success=true;
397 		}
398 
399 		/** Iterate through the reads */
400 		void processInner(){
401 
402 			//Grab and process all lists
403 			for(ListNum<Read> ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){
404 //				if(verbose){outstream.println("Got list of size "+list.size());} //Disabled due to non-static access
405 
406 				processList(ln);
407 			}
408 
409 		}
410 
411 		void processList(ListNum<Read> ln){
412 
413 			//Grab the actual read list from the ListNum
414 			final ArrayList<Read> reads=ln.list;
415 
416 			//Loop through each read in the list
417 			for(int idx=0; idx<reads.size(); idx++){
418 				final Read r=reads.get(idx);
419 
420 				//Validate reads in worker threads
421 				if(!r.validated()){r.validate(true);}
422 
423 				//Track the initial length for statistics
424 				final int initialLength=r.length();
425 
426 				//Increment counters
427 				readsProcessedT+=r.pairCount();
428 				basesProcessedT+=initialLength;
429 
430 				{
431 					//Reads are processed in this block.
432 					boolean keep=processRead(r);
433 
434 					if(!keep){reads.set(idx, null);}
435 					else{
436 						readsOutT++;
437 						basesOutT+=r.length();
438 					}
439 				}
440 			}
441 
442 			//Output reads to the output stream
443 			if(ros!=null){ros.add(reads, ln.id);}
444 		}
445 
446 		/**
447 		 * Process a read or a read pair.
448 		 * @param r Read 1
449 		 * @param r2 Read 2 (may be null)
450 		 * @return True if the reads should be kept, false if they should be discarded.
451 		 */
452 		boolean processRead(final Read r){
453 			if(r.bases==null || r.length()<=1){return false;}
454 			final SamLine sl=r.samline;
455 			if(samFilter!=null && !samFilter.passesFilter(sl)){return false;}
456 
457 //			System.err.println("A: "+sl);
458 
459 //			final SamLine oldSL=new SamLine(sl);
460 //			final Read oldRead=r.clone();
461 
462 			assert(false) : "TODO";
463 			return true;
464 		}
465 
466 		/** Number of reads processed by this thread */
467 		protected long readsProcessedT=0;
468 		/** Number of bases processed by this thread */
469 		protected long basesProcessedT=0;
470 
471 		/** Number of reads retained by this thread */
472 		protected long readsOutT=0;
473 		/** Number of bases retained by this thread */
474 		protected long basesOutT=0;
475 
476 		/** True only if this thread has completed successfully */
477 		boolean success=false;
478 
479 		/** Shared input stream */
480 		private final SamStreamer ss;
481 		/** Shared output stream */
482 		private final ConcurrentReadOutputStream ros;
483 		/** Thread ID */
484 		final int tid;
485 		/** For realigning reads */
486 		final Realigner realigner;
487 	}
488 
489 	/*--------------------------------------------------------------*/
490 	/*----------------            Fields            ----------------*/
491 	/*--------------------------------------------------------------*/
492 
493 	/** Primary input file path */
494 	private String in=null;
495 	/** Secondary input file path */
496 	private String ref=null;
497 
498 	/** Primary output file path */
499 	private String out=null;
500 
501 	/** Override input file extension */
502 	private String extin=null;
503 	/** Override output file extension */
504 	private String extout=null;
505 
506 	/*--------------------------------------------------------------*/
507 
508 	/** Number of reads processed */
509 	protected long readsProcessed=0;
510 	/** Number of bases processed */
511 	protected long basesProcessed=0;
512 
513 	/** Number of reads retained */
514 	protected long readsOut=0;
515 	/** Number of bases retained */
516 	protected long basesOut=0;
517 
518 	/** Quit after processing this many input reads; -1 means no limit */
519 	private long maxReads=-1;
520 
521 	/*--------------------------------------------------------------*/
522 
523 	/** Threads dedicated to reading the sam file */
524 	private int streamerThreads=SamStreamer.DEFAULT_THREADS;
525 
526 	private boolean loadedRef=false;
527 
528 	private boolean realign=false;
529 
530 	private int ploidy=1;
531 
532 	public ScafMap scafMap;
533 	public final SamFilter samFilter=new SamFilter();
534 
535 	/*--------------------------------------------------------------*/
536 	/*----------------         Final Fields         ----------------*/
537 	/*--------------------------------------------------------------*/
538 
539 	/** Primary input file */
540 	private final FileFormat ffin;
541 	/** Secondary input file */
542 	private final FileFormat ffref;
543 
544 	/** Primary output file */
545 	private final FileFormat ffout;
546 
547 	/*--------------------------------------------------------------*/
548 	/*----------------        Common Fields         ----------------*/
549 	/*--------------------------------------------------------------*/
550 
551 	/** Print status messages to this output stream */
552 	private PrintStream outstream=System.err;
553 	/** Print verbose messages */
554 	public static boolean verbose=false;
555 	/** True if an error was encountered */
556 	public boolean errorState=false;
557 	/** Overwrite existing output files */
558 	private boolean overwrite=false;
559 	/** Append to existing output files */
560 	private boolean append=false;
561 	/** Reads are output in input order */
562 	private boolean ordered=false;
563 
564 }
565