1 package tax; 2 3 import java.io.File; 4 import java.io.PrintStream; 5 import java.util.ArrayList; 6 import java.util.LinkedHashSet; 7 8 import fileIO.ByteFile; 9 import fileIO.FileFormat; 10 import fileIO.ReadWrite; 11 import fileIO.TextStreamWriter; 12 import shared.Parse; 13 import shared.Parser; 14 import shared.PreParser; 15 import shared.ReadStats; 16 import shared.Shared; 17 import shared.Timer; 18 import shared.Tools; 19 import stream.ConcurrentReadInputStream; 20 import stream.ConcurrentReadOutputStream; 21 import stream.FASTQ; 22 import stream.FastaReadInputStream; 23 import stream.Read; 24 import structures.ListNum; 25 26 /** 27 * Filters sequences according to their taxonomy, 28 * as determined by the sequence name. Sequences should 29 * be labeled with a gi number or NCBI taxID. 30 * 31 * @author Brian Bushnell 32 * @date November 23, 2015 33 * 34 */ 35 public class FilterByTaxa { 36 37 /*--------------------------------------------------------------*/ 38 /*---------------- Initialization ----------------*/ 39 /*--------------------------------------------------------------*/ 40 41 /** 42 * Code entrance from the command line. 43 * @param args Command line arguments 44 */ main(String[] args)45 public static void main(String[] args){ 46 Timer t=new Timer(); 47 FilterByTaxa x=new FilterByTaxa(args); 48 x.process(t); 49 50 //Close the print stream if it was redirected 51 Shared.closeStream(x.outstream); 52 } 53 54 /** 55 * Constructor. 56 * @param args Command line arguments 57 */ FilterByTaxa(String[] args)58 public FilterByTaxa(String[] args){ 59 60 {//Preparse block for help, config files, and outstream 61 PreParser pp=new PreParser(args, getClass(), false); 62 args=pp.args; 63 outstream=pp.outstream; 64 } 65 66 boolean setInterleaved=false; //Whether interleaved was explicitly set. 67 68 //Set shared static variables 69 Shared.capBuffers(4); 70 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 71 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 72 73 //Create a parser object 74 Parser parser=new Parser(); 75 76 //Parse each argument 77 for(int i=0; i<args.length; i++){ 78 String arg=args[i]; 79 80 //Break arguments into their constituent parts, in the form of "a=b" 81 String[] split=arg.split("="); 82 String a=split[0].toLowerCase(); 83 String b=split.length>1 ? split[1] : null; 84 85 if(parser.parse(arg, a, b)){//Parse standard flags in the parser 86 //do nothing 87 }else if(a.equals("verbose")){ 88 verbose=Parse.parseBoolean(b); 89 }else if(a.equals("besteffort")){ 90 bestEffort=Parse.parseBoolean(b); 91 }else if(a.equals("results") || a.equals("result")){ 92 resultsFile=b; 93 }else if(TaxFilter.validArgument(a)){ 94 //do nothing 95 }else{ 96 outstream.println("Unknown parameter "+args[i]); 97 assert(false) : "Unknown parameter "+args[i]; 98 // throw new RuntimeException("Unknown parameter "+args[i]); 99 } 100 } 101 102 if(resultsFile!=null){nodes=new LinkedHashSet<TaxNode>();} 103 104 {//Process parser fields 105 Parser.processQuality(); 106 107 maxReads=parser.maxReads; 108 109 overwrite=ReadStats.overwrite=parser.overwrite; 110 append=ReadStats.append=parser.append; 111 setInterleaved=parser.setInterleaved; 112 113 in1=parser.in1; 114 in2=parser.in2; 115 qfin1=parser.qfin1; 116 qfin2=parser.qfin2; 117 118 out1=parser.out1; 119 out2=parser.out2; 120 qfout1=parser.qfout1; 121 qfout2=parser.qfout2; 122 123 extin=parser.extin; 124 extout=parser.extout; 125 } 126 127 //Do input file # replacement 128 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ 129 in2=in1.replace("#", "2"); 130 in1=in1.replace("#", "1"); 131 } 132 133 //Do output file # replacement 134 if(out1!=null && out2==null && out1.indexOf('#')>-1){ 135 out2=out1.replace("#", "2"); 136 out1=out1.replace("#", "1"); 137 } 138 139 //Adjust interleaved detection based on the number of input files 140 if(in2!=null){ 141 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} 142 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 143 } 144 145 assert(FastaReadInputStream.settingsOK()); 146 147 //Ensure there is an input file 148 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} 149 150 //Adjust the number of threads for input file reading 151 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 152 ByteFile.FORCE_MODE_BF2=true; 153 } 154 155 //Ensure out2 is not set without out1 156 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");} 157 158 //Adjust interleaved settings based on number of output files 159 if(!setInterleaved){ 160 assert(in1!=null && (out1!=null || out2==null)) : "\nin1="+in1+"\nin2="+in2+"\nout1="+out1+"\nout2="+out2+"\n"; 161 if(in2!=null){ //If there are 2 input streams. 162 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 163 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 164 }else{ //There is one input stream. 165 if(out2!=null){ 166 FASTQ.FORCE_INTERLEAVED=true; 167 FASTQ.TEST_INTERLEAVED=false; 168 outstream.println("Set INTERLEAVED to "+FASTQ.FORCE_INTERLEAVED); 169 } 170 } 171 } 172 173 //Ensure output files can be written 174 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2, resultsFile)){ 175 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2+", "+resultsFile); 176 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n"); 177 } 178 179 //Ensure input files can be read 180 if(!Tools.testInputFiles(false, true, in1, in2)){ 181 throw new RuntimeException("\nCan't read some input files.\n"); 182 } 183 184 //Ensure that no file was specified multiple times 185 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2, resultsFile)){ 186 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 187 } 188 189 //Create output FileFormat objects 190 ffout1=FileFormat.testOutput(out1, FileFormat.FASTA, extout, true, overwrite, append, false); 191 ffout2=FileFormat.testOutput(out2, FileFormat.FASTA, extout, true, overwrite, append, false); 192 193 //Create input FileFormat objects 194 ffin1=FileFormat.testInput(in1, FileFormat.FASTA, extin, true, true); 195 ffin2=FileFormat.testInput(in2, FileFormat.FASTA, extin, true, true); 196 197 //Make the actual filter 198 filter=TaxFilter.makeFilter(args); 199 200 //Widen filter to ensure matches 201 if(bestEffort){filter.reviseByBestEffort(in1);} 202 } 203 204 /*--------------------------------------------------------------*/ 205 /*---------------- Outer Methods ----------------*/ 206 /*--------------------------------------------------------------*/ 207 208 /** Create read streams and process all data */ process(Timer t)209 public void process(Timer t){ 210 211 //Create a read input stream 212 final ConcurrentReadInputStream cris; 213 { 214 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffin1, ffin2, qfin1, qfin2); 215 cris.start(); //Start the stream 216 if(verbose){outstream.println("Started cris");} 217 } 218 boolean paired=cris.paired(); 219 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} 220 221 //Optionally create a read output stream 222 final ConcurrentReadOutputStream ros; 223 if(ffout1!=null){ 224 final int buff=4; 225 226 if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){ 227 outstream.println("Writing interleaved."); 228 } 229 230 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false); 231 ros.start(); //Start the stream 232 }else{ros=null;} 233 234 //Reset counters 235 readsProcessed=0; 236 basesProcessed=0; 237 238 //Process the read stream 239 processInner(cris, ros); 240 241 if(verbose){outstream.println("Finished; closing streams.");} 242 243 //Write anything that was accumulated by ReadStats 244 errorState|=ReadStats.writeAll(); 245 //Close the read streams 246 errorState|=ReadWrite.closeStreams(cris, ros); 247 248 //Report timing and results 249 { 250 t.stop(); 251 252 String ri="Reads In: \t"+readsProcessed+" reads"; 253 String ro="Reads Out: \t"+readsOut+" reads"; 254 while(ro.length()<ri.length()){ro=ro+" ";} 255 256 outstream.println(ri+"\t"+basesProcessed+" bases"); 257 outstream.println(ro+"\t"+basesOut+" bases"); 258 outstream.println(); 259 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 260 } 261 262 //Throw an exception of there was an error in a thread 263 if(errorState){ 264 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 265 } 266 } 267 268 /** Iterate through the reads */ processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros)269 void processInner(final ConcurrentReadInputStream cris, final ConcurrentReadOutputStream ros){ 270 271 //Do anything necessary prior to processing 272 273 { 274 //Grab the first ListNum of reads 275 ListNum<Read> ln=cris.nextList(); 276 //Grab the actual read list from the ListNum 277 ArrayList<Read> reads=(ln!=null ? ln.list : null); 278 279 //Check to ensure pairing is as expected 280 if(reads!=null && !reads.isEmpty()){ 281 Read r=reads.get(0); 282 assert((ffin1==null || ffin1.samOrBam()) || (r.mate!=null)==cris.paired()); 283 } 284 285 //As long as there is a nonempty read list... 286 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 287 if(verbose){outstream.println("Fetched "+reads.size()+" reads.");} 288 289 //Loop through each read in the list 290 for(int idx=0; idx<reads.size(); idx++){ 291 final Read r1=reads.get(idx); 292 final Read r2=r1.mate; 293 294 //Track the initial length for statistics 295 final int initialLength1=r1.length(); 296 final int initialLength2=(r1.mateLength()); 297 298 //Increment counters 299 readsProcessed+=r1.pairCount(); 300 basesProcessed+=initialLength1+initialLength2; 301 302 boolean keep=processReadPair(r1, r2); 303 if(!keep){reads.set(idx, null);} 304 else{ 305 readsOut+=r1.pairCount(); 306 basesOut+=initialLength1+initialLength2; 307 } 308 } 309 310 //Output reads to the output stream 311 if(ros!=null){ros.add(reads, ln.id);} 312 313 //Notify the input stream that the list was used 314 cris.returnList(ln); 315 if(verbose){outstream.println("Returned a list.");} 316 317 //Fetch a new list 318 ln=cris.nextList(); 319 reads=(ln!=null ? ln.list : null); 320 } 321 322 //Notify the input stream that the final list was used 323 if(ln!=null){ 324 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 325 } 326 } 327 328 //Do anything necessary after processing 329 330 if(resultsFile!=null){ 331 TextStreamWriter tsw=new TextStreamWriter(resultsFile, overwrite, append, false); 332 tsw.start(); 333 for(TaxNode tn : nodes){ 334 tsw.println(tn.id+"\t"+tn.levelStringExtended(false)+"\t"+tn.name); 335 } 336 errorState|=tsw.poisonAndWait(); 337 } 338 } 339 340 /*--------------------------------------------------------------*/ 341 /*---------------- Inner Methods ----------------*/ 342 /*--------------------------------------------------------------*/ 343 344 /** 345 * Process a single read pair. 346 * @param r1 Read 1 347 * @param r2 Read 2 (may be null) 348 * @return True if the reads should be kept, false if they should be discarded. 349 */ processReadPair(final Read r1, final Read r2)350 boolean processReadPair(final Read r1, final Read r2){ 351 boolean b=filter.passesFilter(r1.id); 352 if(b && nodes!=null){ 353 TaxNode tn=filter.tree().parseNodeFromHeader(r1.id, true); 354 if(tn!=null){nodes.add(tn);} 355 } 356 return b; 357 } 358 359 /*--------------------------------------------------------------*/ 360 /*---------------- Fields ----------------*/ 361 /*--------------------------------------------------------------*/ 362 363 /** Primary input file path */ 364 private String in1=null; 365 /** Secondary input file path */ 366 private String in2=null; 367 368 private String qfin1=null; 369 private String qfin2=null; 370 371 /** Primary output file path */ 372 private String out1=null; 373 /** Secondary output file path */ 374 private String out2=null; 375 376 private String qfout1=null; 377 private String qfout2=null; 378 379 /** Override input file extension */ 380 private String extin=null; 381 /** Override output file extension */ 382 private String extout=null; 383 384 /** The actual filter */ 385 private final TaxFilter filter; 386 387 /** Recur at a higher level until some sequence matches. Intended for include mode. */ 388 public boolean bestEffort=false; 389 390 /** For listing what is present in the output */ 391 public String resultsFile=null; 392 393 public LinkedHashSet<TaxNode> nodes=null; 394 395 /*--------------------------------------------------------------*/ 396 397 /** Number of reads processed */ 398 protected long readsProcessed=0; 399 /** Number of bases processed */ 400 protected long basesProcessed=0; 401 402 /** Number of reads out */ 403 public long readsOut=0; 404 /** Number of bases out */ 405 public long basesOut=0; 406 407 /** Quit after processing this many input reads; -1 means no limit */ 408 private long maxReads=-1; 409 410 /*--------------------------------------------------------------*/ 411 /*---------------- Final Fields ----------------*/ 412 /*--------------------------------------------------------------*/ 413 414 /** Primary input file */ 415 private final FileFormat ffin1; 416 /** Secondary input file */ 417 private final FileFormat ffin2; 418 419 /** Primary output file */ 420 private final FileFormat ffout1; 421 /** Secondary output file */ 422 private final FileFormat ffout2; 423 424 /*--------------------------------------------------------------*/ 425 /*---------------- Common Fields ----------------*/ 426 /*--------------------------------------------------------------*/ 427 428 /** Print status messages to this output stream */ 429 private PrintStream outstream=System.err; 430 /** Print verbose messages */ 431 public static boolean verbose=false; 432 /** True if an error was encountered */ 433 public boolean errorState=false; 434 /** Overwrite existing output files */ 435 private boolean overwrite=false; 436 /** Append to existing output files */ 437 private boolean append=false; 438 /** This flag has no effect on singlethreaded programs */ 439 private final boolean ordered=false; 440 441 } 442