1 package jgi; 2 3 import java.io.File; 4 import java.io.PrintStream; 5 import java.util.ArrayList; 6 import java.util.Arrays; 7 8 import dna.AminoAcid; 9 import fileIO.ByteFile; 10 import fileIO.FileFormat; 11 import fileIO.ReadWrite; 12 import shared.KillSwitch; 13 import shared.Parse; 14 import shared.Parser; 15 import shared.PreParser; 16 import shared.ReadStats; 17 import shared.Shared; 18 import shared.Timer; 19 import shared.Tools; 20 import stream.ConcurrentReadInputStream; 21 import stream.ConcurrentReadOutputStream; 22 import stream.FASTQ; 23 import stream.FastaReadInputStream; 24 import stream.Read; 25 import structures.ByteBuilder; 26 import structures.ListNum; 27 28 /** 29 * This class does nothing. 30 * It is designed to be easily modified into a program 31 * that processes reads in a single thread. 32 * 33 * @author Brian Bushnell 34 * @date June 20, 2014 35 * 36 */ 37 public class AdjustHomopolymers { 38 39 /*--------------------------------------------------------------*/ 40 /*---------------- Initialization ----------------*/ 41 /*--------------------------------------------------------------*/ 42 43 /** 44 * Code entrance from the command line. 45 * @param args Command line arguments 46 */ main(String[] args)47 public static void main(String[] args){ 48 //Start a timer immediately upon code entrance. 49 Timer t=new Timer(); 50 51 //Create an instance of this class 52 AdjustHomopolymers x=new AdjustHomopolymers(args); 53 54 //Run the object 55 x.process(t); 56 57 //Close the print stream if it was redirected 58 Shared.closeStream(x.outstream); 59 } 60 61 /** 62 * Constructor. 63 * @param args Command line arguments 64 */ AdjustHomopolymers(String[] args)65 public AdjustHomopolymers(String[] args){ 66 67 {//Preparse block for help, config files, and outstream 68 PreParser pp=new PreParser(args, getClass(), false); 69 args=pp.args; 70 outstream=pp.outstream; 71 } 72 73 //Set shared static variables prior to parsing 74 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 75 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 76 Shared.capBuffers(4); //Only for singlethreaded programs 77 78 {//Parse the arguments 79 final Parser parser=parse(args); 80 Parser.processQuality(); 81 82 maxReads=parser.maxReads; 83 overwrite=ReadStats.overwrite=parser.overwrite; 84 append=ReadStats.append=parser.append; 85 setInterleaved=parser.setInterleaved; 86 87 in1=parser.in1; 88 in2=parser.in2; 89 qfin1=parser.qfin1; 90 qfin2=parser.qfin2; 91 extin=parser.extin; 92 93 out1=parser.out1; 94 out2=parser.out2; 95 qfout1=parser.qfout1; 96 qfout2=parser.qfout2; 97 extout=parser.extout; 98 } 99 100 assert(rate!=0) : "'rate' should be set to a positive or negative value, such as rate=0.1 to expand homopolymers by 10%."; 101 102 doPoundReplacement(); //Replace # with 1 and 2 103 adjustInterleaving(); //Make sure interleaving agrees with number of input and output files 104 fixExtensions(); //Add or remove .gz or .bz2 as needed 105 checkFileExistence(); //Ensure files can be read and written 106 checkStatics(); //Adjust file-related static fields as needed for this program 107 108 //Create output FileFormat objects 109 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, ordered); 110 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, ordered); 111 112 //Create input FileFormat objects 113 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); 114 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); 115 } 116 117 /*--------------------------------------------------------------*/ 118 /*---------------- Initialization Helpers ----------------*/ 119 /*--------------------------------------------------------------*/ 120 121 /** Parse arguments from the command line */ parse(String[] args)122 private Parser parse(String[] args){ 123 124 //Create a parser object 125 Parser parser=new Parser(); 126 127 //Set any necessary Parser defaults here 128 //parser.foo=bar; 129 130 //Parse each argument 131 for(int i=0; i<args.length; i++){ 132 String arg=args[i]; 133 134 //Break arguments into their constituent parts, in the form of "a=b" 135 String[] split=arg.split("="); 136 String a=split[0].toLowerCase(); 137 String b=split.length>1 ? split[1] : null; 138 if(b!=null && b.equalsIgnoreCase("null")){b=null;} 139 140 if(a.equals("verbose")){ 141 verbose=Parse.parseBoolean(b); 142 }else if(a.equals("rate")){ 143 rate=Float.parseFloat(b); 144 }else if(a.equals("parse_flag_goes_here")){ 145 long fake_variable=Parse.parseKMG(b); 146 //Set a variable here 147 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser 148 //do nothing 149 }else{ 150 outstream.println("Unknown parameter "+args[i]); 151 assert(false) : "Unknown parameter "+args[i]; 152 } 153 } 154 155 return parser; 156 } 157 158 /** Replace # with 1 and 2 in headers */ doPoundReplacement()159 private void doPoundReplacement(){ 160 //Do input file # replacement 161 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ 162 in2=in1.replace("#", "2"); 163 in1=in1.replace("#", "1"); 164 } 165 166 //Do output file # replacement 167 if(out1!=null && out2==null && out1.indexOf('#')>-1){ 168 out2=out1.replace("#", "2"); 169 out1=out1.replace("#", "1"); 170 } 171 172 //Ensure there is an input file 173 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} 174 175 //Ensure out2 is not set without out1 176 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");} 177 } 178 179 /** Add or remove .gz or .bz2 as needed */ fixExtensions()180 private void fixExtensions(){ 181 in1=Tools.fixExtension(in1); 182 in2=Tools.fixExtension(in2); 183 qfin1=Tools.fixExtension(qfin1); 184 qfin2=Tools.fixExtension(qfin2); 185 } 186 187 /** Ensure files can be read and written */ checkFileExistence()188 private void checkFileExistence(){ 189 //Ensure output files can be written 190 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){ 191 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2); 192 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n"); 193 } 194 195 //Ensure input files can be read 196 if(!Tools.testInputFiles(false, true, in1, in2)){ 197 throw new RuntimeException("\nCan't read some input files.\n"); 198 } 199 200 //Ensure that no file was specified multiple times 201 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){ 202 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 203 } 204 } 205 206 /** Make sure interleaving agrees with number of input and output files */ adjustInterleaving()207 private void adjustInterleaving(){ 208 //Adjust interleaved detection based on the number of input files 209 if(in2!=null){ 210 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} 211 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 212 } 213 214 //Adjust interleaved settings based on number of output files 215 if(!setInterleaved){ 216 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n"; 217 if(in2!=null){ //If there are 2 input streams. 218 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 219 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 220 }else{ //There is one input stream. 221 if(out2!=null){ 222 FASTQ.FORCE_INTERLEAVED=true; 223 FASTQ.TEST_INTERLEAVED=false; 224 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 225 } 226 } 227 } 228 } 229 230 /** Adjust file-related static fields as needed for this program */ checkStatics()231 private static void checkStatics(){ 232 //Adjust the number of threads for input file reading 233 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 234 ByteFile.FORCE_MODE_BF2=true; 235 } 236 237 assert(FastaReadInputStream.settingsOK()); 238 } 239 240 /*--------------------------------------------------------------*/ 241 /*---------------- Outer Methods ----------------*/ 242 /*--------------------------------------------------------------*/ 243 244 /** Create read streams and process all data */ process(Timer t)245 void process(Timer t){ 246 247 //Create a read input stream 248 final ConcurrentReadInputStream cris=makeCris(); 249 250 //Optionally create a read output stream 251 final ConcurrentReadOutputStream ros=makeCros(cris.paired()); 252 253 //Reset counters 254 readsProcessed=readsOut=0; 255 basesProcessed=basesOut=0; 256 257 //Process the read stream 258 processInner(cris, ros); 259 260 if(verbose){outstream.println("Finished; closing streams.");} 261 262 //Write anything that was accumulated by ReadStats 263 errorState|=ReadStats.writeAll(); 264 //Close the read streams 265 errorState|=ReadWrite.closeStreams(cris, ros); 266 267 //Report timing and results 268 t.stop(); 269 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 270 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); 271 272 //Throw an exception of there was an error in a thread 273 if(errorState){ 274 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 275 } 276 } 277 278 /*--------------------------------------------------------------*/ 279 /*---------------- Inner Methods ----------------*/ 280 /*--------------------------------------------------------------*/ 281 makeCris()282 private ConcurrentReadInputStream makeCris(){ 283 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2); 284 cris.start(); //Start the stream 285 if(verbose){outstream.println("Started cris");} 286 boolean paired=cris.paired(); 287 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} 288 return cris; 289 } 290 makeCros(boolean pairedInput)291 private ConcurrentReadOutputStream makeCros(boolean pairedInput){ 292 if(ffout1==null){return null;} 293 294 //Select output buffer size based on whether it needs to be ordered 295 final int buff=4; 296 297 //Notify user of output mode 298 if(pairedInput && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){ 299 outstream.println("Writing interleaved."); 300 } 301 302 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false); 303 ros.start(); //Start the stream 304 return ros; 305 } 306 307 /** Iterate through the reads */ processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros)308 void processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){ 309 310 //Do anything necessary prior to processing 311 312 { 313 //Grab the first ListNum of reads 314 ListNum<Read> ln=cris.nextList(); 315 316 //Check to ensure pairing is as expected 317 if(ln!=null && !ln.isEmpty()){ 318 Read r=ln.get(0); 319 assert(ffin1.samOrBam() || (r.mate!=null)==cris.paired()); 320 } 321 322 //As long as there is a nonempty read list... 323 while(ln!=null && ln.size()>0){ 324 // if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} //Disabled due to non-static access 325 326 processList(ln, cris, ros); 327 328 //Fetch a new list 329 ln=cris.nextList(); 330 } 331 332 //Notify the input stream that the final list was used 333 if(ln!=null){ 334 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 335 } 336 } 337 338 //Do anything necessary after processing 339 340 } 341 342 /** 343 * Process a list of Reads. 344 * @param ln The list. 345 * @param cris Read Input Stream 346 * @param ros Read Output Stream for reads that will be retained 347 */ processList(ListNum<Read> ln, final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros)348 void processList(ListNum<Read> ln, final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){ 349 350 //Grab the actual read list from the ListNum 351 final ArrayList<Read> reads=ln.list; 352 353 //Loop through each read in the list 354 for(int idx=0; idx<reads.size(); idx++){ 355 final Read r1=reads.get(idx); 356 final Read r2=r1.mate; 357 358 //Validate reads in worker threads 359 if(!r1.validated()){r1.validate(true);} 360 if(r2!=null && !r2.validated()){r2.validate(true);} 361 362 //Track the initial length for statistics 363 final int initialLength1=r1.length(); 364 final int initialLength2=r1.mateLength(); 365 366 //Increment counters 367 readsProcessed+=r1.pairCount(); 368 basesProcessed+=initialLength1+initialLength2; 369 370 { 371 //Reads are processed in this block. 372 boolean keep=processReadPair(r1, r2); 373 374 if(!keep){reads.set(idx, null);} 375 else{ 376 readsOut+=r1.pairCount(); 377 basesOut+=r1.pairLength(); 378 } 379 } 380 } 381 382 //Output reads to the output stream 383 if(ros!=null){ros.add(reads, ln.id);} 384 385 //Notify the input stream that the list was used 386 cris.returnList(ln); 387 // if(verbose){outstream.println("Returned a list.");} //Disabled due to non-static access 388 } 389 390 391 /** 392 * Process a single read pair. 393 * @param r1 Read 1 394 * @param r2 Read 2 (may be null) 395 * @return True if the reads should be kept, false if they should be discarded. 396 */ processReadPair(final Read r1, final Read r2)397 boolean processReadPair(final Read r1, final Read r2){ 398 processRead(r1); 399 processRead(r2); 400 return true; 401 } 402 processRead(final Read r)403 void processRead(final Read r){ 404 if(r==null){return;} 405 bbBases.clear(); 406 bbQuals.clear(); 407 final byte[] bases=r.bases; 408 final byte[] quals=(r.quality==null ? fakeQuality(bases.length) : r.quality); 409 410 byte prevBase='?'; 411 byte prevQual=20; 412 byte streak=0; 413 for(int i=0; i<bases.length; i++){ 414 final byte b=bases[i]; 415 final byte q=quals[i]; 416 bbBases.append(b); 417 bbQuals.append(q); 418 if(b==prevBase){ 419 streak++; 420 }else{ 421 int adjustment=AminoAcid.isFullyDefined(prevBase) ? (int)(rate*streak) : 0; 422 if(adjustment<0){ 423 bbBases.length+=adjustment; 424 bbQuals.length+=adjustment; 425 }else{ 426 for(; adjustment>0; adjustment--){ 427 bbBases.append(prevBase); 428 bbQuals.append(prevQual); 429 } 430 } 431 streak=1; 432 } 433 prevBase=b; 434 prevQual=q; 435 } 436 if(bbBases.length()!=r.length()){ 437 r.bases=bbBases.toBytes(); 438 r.quality=(r.quality==null ? null : bbQuals.toBytes()); 439 } 440 } 441 fakeQuality(int minlen)442 private byte[] fakeQuality(int minlen){ 443 if(fakeQuality.length<minlen){ 444 fakeQuality=KillSwitch.allocByte1D(minlen+10); 445 Arrays.fill(fakeQuality, Shared.FAKE_QUAL); 446 } 447 return fakeQuality; 448 } 449 450 /*--------------------------------------------------------------*/ 451 /*---------------- Fields ----------------*/ 452 /*--------------------------------------------------------------*/ 453 454 /** Primary input file path */ 455 private String in1=null; 456 /** Secondary input file path */ 457 private String in2=null; 458 459 private String qfin1=null; 460 private String qfin2=null; 461 462 /** Primary output file path */ 463 private String out1=null; 464 /** Secondary output file path */ 465 private String out2=null; 466 467 private String qfout1=null; 468 private String qfout2=null; 469 470 /** Override input file extension */ 471 private String extin=null; 472 /** Override output file extension */ 473 private String extout=null; 474 475 /** Whether interleaved was explicitly set. */ 476 private boolean setInterleaved=false; 477 478 private ByteBuilder bbBases=new ByteBuilder(); 479 private ByteBuilder bbQuals=new ByteBuilder(); 480 481 private float rate=0.0f; 482 483 private byte[] fakeQuality=new byte[0]; 484 485 /*--------------------------------------------------------------*/ 486 487 /** Number of reads processed */ 488 protected long readsProcessed=0; 489 /** Number of bases processed */ 490 protected long basesProcessed=0; 491 492 /** Number of reads retained */ 493 protected long readsOut=0; 494 /** Number of bases retained */ 495 protected long basesOut=0; 496 497 /** Quit after processing this many input reads; -1 means no limit */ 498 private long maxReads=-1; 499 500 /*--------------------------------------------------------------*/ 501 /*---------------- Final Fields ----------------*/ 502 /*--------------------------------------------------------------*/ 503 504 /** Primary input file */ 505 private final FileFormat ffin1; 506 /** Secondary input file */ 507 private final FileFormat ffin2; 508 509 /** Primary output file */ 510 private final FileFormat ffout1; 511 /** Secondary output file */ 512 private final FileFormat ffout2; 513 514 /*--------------------------------------------------------------*/ 515 /*---------------- Common Fields ----------------*/ 516 /*--------------------------------------------------------------*/ 517 518 /** Print status messages to this output stream */ 519 private PrintStream outstream=System.err; 520 /** Print verbose messages */ 521 public static boolean verbose=false; 522 /** True if an error was encountered */ 523 public boolean errorState=false; 524 /** Overwrite existing output files */ 525 private boolean overwrite=false; 526 /** Append to existing output files */ 527 private boolean append=false; 528 /** This flag has no effect on singlethreaded programs */ 529 private final boolean ordered=false; 530 531 } 532