1 package jgi;
2 
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Arrays;
7 
8 import dna.AminoAcid;
9 import fileIO.ByteFile;
10 import fileIO.FileFormat;
11 import fileIO.ReadWrite;
12 import shared.KillSwitch;
13 import shared.Parse;
14 import shared.Parser;
15 import shared.PreParser;
16 import shared.ReadStats;
17 import shared.Shared;
18 import shared.Timer;
19 import shared.Tools;
20 import stream.ConcurrentReadInputStream;
21 import stream.ConcurrentReadOutputStream;
22 import stream.FASTQ;
23 import stream.FastaReadInputStream;
24 import stream.Read;
25 import structures.ByteBuilder;
26 import structures.ListNum;
27 
28 /**
29  * This class does nothing.
30  * It is designed to be easily modified into a program
31  * that processes reads in a single thread.
32  *
33  * @author Brian Bushnell
34  * @date June 20, 2014
35  *
36  */
37 public class AdjustHomopolymers {
38 
39 	/*--------------------------------------------------------------*/
40 	/*----------------        Initialization        ----------------*/
41 	/*--------------------------------------------------------------*/
42 
43 	/**
44 	 * Code entrance from the command line.
45 	 * @param args Command line arguments
46 	 */
main(String[] args)47 	public static void main(String[] args){
48 		//Start a timer immediately upon code entrance.
49 		Timer t=new Timer();
50 
51 		//Create an instance of this class
52 		AdjustHomopolymers x=new AdjustHomopolymers(args);
53 
54 		//Run the object
55 		x.process(t);
56 
57 		//Close the print stream if it was redirected
58 		Shared.closeStream(x.outstream);
59 	}
60 
61 	/**
62 	 * Constructor.
63 	 * @param args Command line arguments
64 	 */
AdjustHomopolymers(String[] args)65 	public AdjustHomopolymers(String[] args){
66 
67 		{//Preparse block for help, config files, and outstream
68 			PreParser pp=new PreParser(args, getClass(), false);
69 			args=pp.args;
70 			outstream=pp.outstream;
71 		}
72 
73 		//Set shared static variables prior to parsing
74 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
75 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
76 		Shared.capBuffers(4); //Only for singlethreaded programs
77 
78 		{//Parse the arguments
79 			final Parser parser=parse(args);
80 			Parser.processQuality();
81 
82 			maxReads=parser.maxReads;
83 			overwrite=ReadStats.overwrite=parser.overwrite;
84 			append=ReadStats.append=parser.append;
85 			setInterleaved=parser.setInterleaved;
86 
87 			in1=parser.in1;
88 			in2=parser.in2;
89 			qfin1=parser.qfin1;
90 			qfin2=parser.qfin2;
91 			extin=parser.extin;
92 
93 			out1=parser.out1;
94 			out2=parser.out2;
95 			qfout1=parser.qfout1;
96 			qfout2=parser.qfout2;
97 			extout=parser.extout;
98 		}
99 
100 		assert(rate!=0) : "'rate' should be set to a positive or negative value, such as rate=0.1 to expand homopolymers by 10%.";
101 
102 		doPoundReplacement(); //Replace # with 1 and 2
103 		adjustInterleaving(); //Make sure interleaving agrees with number of input and output files
104 		fixExtensions(); //Add or remove .gz or .bz2 as needed
105 		checkFileExistence(); //Ensure files can be read and written
106 		checkStatics(); //Adjust file-related static fields as needed for this program
107 
108 		//Create output FileFormat objects
109 		ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
110 		ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered);
111 
112 		//Create input FileFormat objects
113 		ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
114 		ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
115 	}
116 
117 	/*--------------------------------------------------------------*/
118 	/*----------------    Initialization Helpers    ----------------*/
119 	/*--------------------------------------------------------------*/
120 
121 	/** Parse arguments from the command line */
parse(String[] args)122 	private Parser parse(String[] args){
123 
124 		//Create a parser object
125 		Parser parser=new Parser();
126 
127 		//Set any necessary Parser defaults here
128 		//parser.foo=bar;
129 
130 		//Parse each argument
131 		for(int i=0; i<args.length; i++){
132 			String arg=args[i];
133 
134 			//Break arguments into their constituent parts, in the form of "a=b"
135 			String[] split=arg.split("=");
136 			String a=split[0].toLowerCase();
137 			String b=split.length>1 ? split[1] : null;
138 			if(b!=null && b.equalsIgnoreCase("null")){b=null;}
139 
140 			if(a.equals("verbose")){
141 				verbose=Parse.parseBoolean(b);
142 			}else if(a.equals("rate")){
143 				rate=Float.parseFloat(b);
144 			}else if(a.equals("parse_flag_goes_here")){
145 				long fake_variable=Parse.parseKMG(b);
146 				//Set a variable here
147 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
148 				//do nothing
149 			}else{
150 				outstream.println("Unknown parameter "+args[i]);
151 				assert(false) : "Unknown parameter "+args[i];
152 			}
153 		}
154 
155 		return parser;
156 	}
157 
158 	/** Replace # with 1 and 2 in headers */
doPoundReplacement()159 	private void doPoundReplacement(){
160 		//Do input file # replacement
161 		if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
162 			in2=in1.replace("#", "2");
163 			in1=in1.replace("#", "1");
164 		}
165 
166 		//Do output file # replacement
167 		if(out1!=null && out2==null && out1.indexOf('#')>-1){
168 			out2=out1.replace("#", "2");
169 			out1=out1.replace("#", "1");
170 		}
171 
172 		//Ensure there is an input file
173 		if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
174 
175 		//Ensure out2 is not set without out1
176 		if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
177 	}
178 
179 	/** Add or remove .gz or .bz2 as needed */
fixExtensions()180 	private void fixExtensions(){
181 		in1=Tools.fixExtension(in1);
182 		in2=Tools.fixExtension(in2);
183 		qfin1=Tools.fixExtension(qfin1);
184 		qfin2=Tools.fixExtension(qfin2);
185 	}
186 
187 	/** Ensure files can be read and written */
checkFileExistence()188 	private void checkFileExistence(){
189 		//Ensure output files can be written
190 		if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
191 			outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
192 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
193 		}
194 
195 		//Ensure input files can be read
196 		if(!Tools.testInputFiles(false, true, in1, in2)){
197 			throw new RuntimeException("\nCan't read some input files.\n");
198 		}
199 
200 		//Ensure that no file was specified multiple times
201 		if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
202 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
203 		}
204 	}
205 
206 	/** Make sure interleaving agrees with number of input and output files */
adjustInterleaving()207 	private void adjustInterleaving(){
208 		//Adjust interleaved detection based on the number of input files
209 		if(in2!=null){
210 			if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
211 			FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
212 		}
213 
214 		//Adjust interleaved settings based on number of output files
215 		if(!setInterleaved){
216 			assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n";
217 			if(in2!=null){ //If there are 2 input streams.
218 				FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
219 				outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
220 			}else{ //There is one input stream.
221 				if(out2!=null){
222 					FASTQ.FORCE_INTERLEAVED=true;
223 					FASTQ.TEST_INTERLEAVED=false;
224 					outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED);
225 				}
226 			}
227 		}
228 	}
229 
230 	/** Adjust file-related static fields as needed for this program */
checkStatics()231 	private static void checkStatics(){
232 		//Adjust the number of threads for input file reading
233 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
234 			ByteFile.FORCE_MODE_BF2=true;
235 		}
236 
237 		assert(FastaReadInputStream.settingsOK());
238 	}
239 
240 	/*--------------------------------------------------------------*/
241 	/*----------------         Outer Methods        ----------------*/
242 	/*--------------------------------------------------------------*/
243 
244 	/** Create read streams and process all data */
process(Timer t)245 	void process(Timer t){
246 
247 		//Create a read input stream
248 		final ConcurrentReadInputStream cris=makeCris();
249 
250 		//Optionally create a read output stream
251 		final ConcurrentReadOutputStream ros=makeCros(cris.paired());
252 
253 		//Reset counters
254 		readsProcessed=readsOut=0;
255 		basesProcessed=basesOut=0;
256 
257 		//Process the read stream
258 		processInner(cris, ros);
259 
260 		if(verbose){outstream.println("Finished; closing streams.");}
261 
262 		//Write anything that was accumulated by ReadStats
263 		errorState|=ReadStats.writeAll();
264 		//Close the read streams
265 		errorState|=ReadWrite.closeStreams(cris, ros);
266 
267 		//Report timing and results
268 		t.stop();
269 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
270 		outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
271 
272 		//Throw an exception of there was an error in a thread
273 		if(errorState){
274 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
275 		}
276 	}
277 
278 	/*--------------------------------------------------------------*/
279 	/*----------------         Inner Methods        ----------------*/
280 	/*--------------------------------------------------------------*/
281 
makeCris()282 	private ConcurrentReadInputStream makeCris(){
283 		ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2);
284 		cris.start(); //Start the stream
285 		if(verbose){outstream.println("Started cris");}
286 		boolean paired=cris.paired();
287 		if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
288 		return cris;
289 	}
290 
makeCros(boolean pairedInput)291 	private ConcurrentReadOutputStream makeCros(boolean pairedInput){
292 		if(ffout1==null){return null;}
293 
294 		//Select output buffer size based on whether it needs to be ordered
295 		final int buff=4;
296 
297 		//Notify user of output mode
298 		if(pairedInput && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
299 			outstream.println("Writing interleaved.");
300 		}
301 
302 		final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
303 		ros.start(); //Start the stream
304 		return ros;
305 	}
306 
307 	/** Iterate through the reads */
processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros)308 	void processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
309 
310 		//Do anything necessary prior to processing
311 
312 		{
313 			//Grab the first ListNum of reads
314 			ListNum<Read> ln=cris.nextList();
315 
316 			//Check to ensure pairing is as expected
317 			if(ln!=null && !ln.isEmpty()){
318 				Read r=ln.get(0);
319 				assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired());
320 			}
321 
322 			//As long as there is a nonempty read list...
323 			while(ln!=null && ln.size()>0){
324 //				if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access
325 
326 				processList(ln, cris, ros);
327 
328 				//Fetch a new list
329 				ln=cris.nextList();
330 			}
331 
332 			//Notify the input stream that the final list was used
333 			if(ln!=null){
334 				cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
335 			}
336 		}
337 
338 		//Do anything necessary after processing
339 
340 	}
341 
342 	/**
343 	 * Process a list of Reads.
344 	 * @param ln The list.
345 	 * @param cris Read Input Stream
346 	 * @param ros Read Output Stream for reads that will be retained
347 	 */
processList(ListNum<Read> ln, final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros)348 	void processList(ListNum<Read> ln, final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){
349 
350 		//Grab the actual read list from the ListNum
351 		final ArrayList<Read> reads=ln.list;
352 
353 		//Loop through each read in the list
354 		for(int idx=0; idx<reads.size(); idx++){
355 			final Read r1=reads.get(idx);
356 			final Read r2=r1.mate;
357 
358 			//Validate reads in worker threads
359 			if(!r1.validated()){r1.validate(true);}
360 			if(r2!=null && !r2.validated()){r2.validate(true);}
361 
362 			//Track the initial length for statistics
363 			final int initialLength1=r1.length();
364 			final int initialLength2=r1.mateLength();
365 
366 			//Increment counters
367 			readsProcessed+=r1.pairCount();
368 			basesProcessed+=initialLength1+initialLength2;
369 
370 			{
371 				//Reads are processed in this block.
372 				boolean keep=processReadPair(r1, r2);
373 
374 				if(!keep){reads.set(idx, null);}
375 				else{
376 					readsOut+=r1.pairCount();
377 					basesOut+=r1.pairLength();
378 				}
379 			}
380 		}
381 
382 		//Output reads to the output stream
383 		if(ros!=null){ros.add(reads, ln.id);}
384 
385 		//Notify the input stream that the list was used
386 		cris.returnList(ln);
387 //		if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access
388 	}
389 
390 
391 	/**
392 	 * Process a single read pair.
393 	 * @param r1 Read 1
394 	 * @param r2 Read 2 (may be null)
395 	 * @return True if the reads should be kept, false if they should be discarded.
396 	 */
processReadPair(final Read r1, final Read r2)397 	boolean processReadPair(final Read r1, final Read r2){
398 		processRead(r1);
399 		processRead(r2);
400 		return true;
401 	}
402 
processRead(final Read r)403 	void processRead(final Read r){
404 		if(r==null){return;}
405 		bbBases.clear();
406 		bbQuals.clear();
407 		final byte[] bases=r.bases;
408 		final byte[] quals=(r.quality==null ? fakeQuality(bases.length) : r.quality);
409 
410 		byte prevBase='?';
411 		byte prevQual=20;
412 		byte streak=0;
413 		for(int i=0; i<bases.length; i++){
414 			final byte b=bases[i];
415 			final byte q=quals[i];
416 			bbBases.append(b);
417 			bbQuals.append(q);
418 			if(b==prevBase){
419 				streak++;
420 			}else{
421 				int adjustment=AminoAcid.isFullyDefined(prevBase) ? (int)(rate*streak) : 0;
422 				if(adjustment<0){
423 					bbBases.length+=adjustment;
424 					bbQuals.length+=adjustment;
425 				}else{
426 					for(; adjustment>0; adjustment--){
427 						bbBases.append(prevBase);
428 						bbQuals.append(prevQual);
429 					}
430 				}
431 				streak=1;
432 			}
433 			prevBase=b;
434 			prevQual=q;
435 		}
436 		if(bbBases.length()!=r.length()){
437 			r.bases=bbBases.toBytes();
438 			r.quality=(r.quality==null ? null : bbQuals.toBytes());
439 		}
440 	}
441 
fakeQuality(int minlen)442 	private byte[] fakeQuality(int minlen){
443 		if(fakeQuality.length<minlen){
444 			fakeQuality=KillSwitch.allocByte1D(minlen+10);
445 			Arrays.fill(fakeQuality, Shared.FAKE_QUAL);
446 		}
447 		return fakeQuality;
448 	}
449 
450 	/*--------------------------------------------------------------*/
451 	/*----------------            Fields            ----------------*/
452 	/*--------------------------------------------------------------*/
453 
454 	/** Primary input file path */
455 	private String in1=null;
456 	/** Secondary input file path */
457 	private String in2=null;
458 
459 	private String qfin1=null;
460 	private String qfin2=null;
461 
462 	/** Primary output file path */
463 	private String out1=null;
464 	/** Secondary output file path */
465 	private String out2=null;
466 
467 	private String qfout1=null;
468 	private String qfout2=null;
469 
470 	/** Override input file extension */
471 	private String extin=null;
472 	/** Override output file extension */
473 	private String extout=null;
474 
475 	/** Whether interleaved was explicitly set. */
476 	private boolean setInterleaved=false;
477 
478 	private ByteBuilder bbBases=new ByteBuilder();
479 	private ByteBuilder bbQuals=new ByteBuilder();
480 
481 	private float rate=0.0f;
482 
483 	private byte[] fakeQuality=new byte[0];
484 
485 	/*--------------------------------------------------------------*/
486 
487 	/** Number of reads processed */
488 	protected long readsProcessed=0;
489 	/** Number of bases processed */
490 	protected long basesProcessed=0;
491 
492 	/** Number of reads retained */
493 	protected long readsOut=0;
494 	/** Number of bases retained */
495 	protected long basesOut=0;
496 
497 	/** Quit after processing this many input reads; -1 means no limit */
498 	private long maxReads=-1;
499 
500 	/*--------------------------------------------------------------*/
501 	/*----------------         Final Fields         ----------------*/
502 	/*--------------------------------------------------------------*/
503 
504 	/** Primary input file */
505 	private final FileFormat ffin1;
506 	/** Secondary input file */
507 	private final FileFormat ffin2;
508 
509 	/** Primary output file */
510 	private final FileFormat ffout1;
511 	/** Secondary output file */
512 	private final FileFormat ffout2;
513 
514 	/*--------------------------------------------------------------*/
515 	/*----------------        Common Fields         ----------------*/
516 	/*--------------------------------------------------------------*/
517 
518 	/** Print status messages to this output stream */
519 	private PrintStream outstream=System.err;
520 	/** Print verbose messages */
521 	public static boolean verbose=false;
522 	/** True if an error was encountered */
523 	public boolean errorState=false;
524 	/** Overwrite existing output files */
525 	private boolean overwrite=false;
526 	/** Append to existing output files */
527 	private boolean append=false;
528 	/** This flag has no effect on singlethreaded programs */
529 	private final boolean ordered=false;
530 
531 }
532