1 package template; 2 3 import java.io.PrintStream; 4 import java.util.ArrayList; 5 6 import fileIO.ByteFile; 7 import fileIO.FileFormat; 8 import fileIO.ReadWrite; 9 import shared.Parse; 10 import shared.Parser; 11 import shared.PreParser; 12 import shared.ReadStats; 13 import shared.Shared; 14 import shared.Timer; 15 import shared.Tools; 16 import stream.ConcurrentReadInputStream; 17 import stream.ConcurrentReadOutputStream; 18 import stream.FastaReadInputStream; 19 import stream.Read; 20 import stream.SamLine; 21 import stream.SamReadStreamer; 22 import stream.SamStreamer; 23 import structures.ListNum; 24 import var2.Realigner; 25 import var2.SamFilter; 26 import var2.ScafMap; 27 28 /** 29 * This class does nothing. 30 * It is designed to be easily modified into a program 31 * that processes reads in multiple threads, by 32 * filling in the processRead method. 33 * 34 * @author Brian Bushnell 35 * @date September 6, 2019 36 * 37 */ 38 public class A_SampleSamStreamer implements Accumulator<A_SampleSamStreamer.ProcessThread> { 39 40 /*--------------------------------------------------------------*/ 41 /*---------------- Initialization ----------------*/ 42 /*--------------------------------------------------------------*/ 43 44 /** 45 * Code entrance from the command line. 46 * @param args Command line arguments 47 */ main(String[] args)48 public static void main(String[] args){ 49 //Start a timer immediately upon code entrance. 50 Timer t=new Timer(); 51 52 //Create an instance of this class 53 A_SampleSamStreamer x=new A_SampleSamStreamer(args); 54 55 //Run the object 56 x.process(t); 57 58 //Close the print stream if it was redirected 59 Shared.closeStream(x.outstream); 60 } 61 62 /** 63 * Constructor. 64 * @param args Command line arguments 65 */ A_SampleSamStreamer(String[] args)66 public A_SampleSamStreamer(String[] args){ 67 68 {//Preparse block for help, config files, and outstream 69 PreParser pp=new PreParser(args, getClass(), false); 70 args=pp.args; 71 outstream=pp.outstream; 72 } 73 74 //Set shared static variables prior to parsing 75 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 76 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 77 78 // samFilter.includeUnmapped=false; 79 // samFilter.includeSupplimentary=false; 80 // samFilter.includeDuplicate=false; 81 // samFilter.includeNonPrimary=false; 82 // samFilter.includeQfail=false; 83 // samFilter.minMapq=4; 84 85 {//Parse the arguments 86 final Parser parser=parse(args); 87 88 Parser.processQuality(); 89 90 maxReads=parser.maxReads; 91 overwrite=ReadStats.overwrite=parser.overwrite; 92 append=ReadStats.append=parser.append; 93 94 in=parser.in1; 95 extin=parser.extin; 96 97 out=parser.out1; 98 extout=parser.extout; 99 } 100 101 { 102 // if("auto".equalsIgnoreCase(atomic)){Scaffold.setCA3A(Shared.threads()>8);} 103 // else{Scaffold.setCA3A(Parse.parseBoolean(atomic));} 104 105 if(ploidy<1){System.err.println("WARNING: ploidy not set; assuming ploidy=1."); ploidy=1;} 106 samFilter.setSamtoolsFilter(); 107 108 streamerThreads=Tools.max(1, Tools.min(streamerThreads, Shared.threads())); 109 assert(streamerThreads>0) : streamerThreads; 110 } 111 112 validateParams(); 113 fixExtensions(); //Add or remove .gz or .bz2 as needed 114 checkFileExistence(); //Ensure files can be read and written 115 checkStatics(); //Adjust file-related static fields as needed for this program 116 117 //Create output FileFormat objects 118 ffout=FileFormat.testOutput(out, FileFormat.SAM, extout, true, overwrite, append, ordered); 119 assert(false) : "TODO: Default output format might be fasta."; 120 121 //Create input FileFormat objects 122 ffin=FileFormat.testInput(in, FileFormat.SAM, extin, true, true); 123 ffref=FileFormat.testInput(ref, FileFormat.FASTA, null, true, true); 124 } 125 126 /*--------------------------------------------------------------*/ 127 /*---------------- Initialization Helpers ----------------*/ 128 /*--------------------------------------------------------------*/ 129 130 /** Parse arguments from the command line */ parse(String[] args)131 private Parser parse(String[] args){ 132 133 //Create a parser object 134 Parser parser=new Parser(); 135 136 //Set any necessary Parser defaults here 137 //parser.foo=bar; 138 139 //Parse each argument 140 for(int i=0; i<args.length; i++){ 141 String arg=args[i]; 142 143 //Break arguments into their constituent parts, in the form of "a=b" 144 String[] split=arg.split("="); 145 String a=split[0].toLowerCase(); 146 String b=split.length>1 ? split[1] : null; 147 if(b!=null && b.equalsIgnoreCase("null")){b=null;} 148 149 if(a.equals("verbose")){ 150 verbose=Parse.parseBoolean(b); 151 }else if(a.equals("ref")){ 152 ref=b; 153 }else if(a.equals("ordered")){ 154 ordered=Parse.parseBoolean(b); 155 }else if(a.equals("realign")){ 156 realign=Parse.parseBoolean(b); 157 }else if(a.equals("ploidy")){ 158 ploidy=Integer.parseInt(b); 159 }else if(a.equals("clearfilters")){ 160 if(Parse.parseBoolean(b)){ 161 samFilter.clear(); 162 } 163 }else if(a.equals("parse_flag_goes_here")){ 164 long fake_variable=Parse.parseKMG(b); 165 //Set a variable here 166 }else if(samFilter.parse(arg, a, b)){ 167 //do nothing 168 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser 169 //do nothing 170 }else{ 171 outstream.println("Unknown parameter "+args[i]); 172 assert(false) : "Unknown parameter "+args[i]; 173 } 174 } 175 176 return parser; 177 } 178 179 /** Add or remove .gz or .bz2 as needed */ fixExtensions()180 private void fixExtensions(){ 181 in=Tools.fixExtension(in); 182 ref=Tools.fixExtension(ref); 183 } 184 185 /** Ensure files can be read and written */ checkFileExistence()186 private void checkFileExistence(){ 187 188 //Ensure there is an input file 189 if(in==null){throw new RuntimeException("Error - an input file is required.");} 190 191 //Ensure there is an input file 192 assert(false) : "TODO: Check."; 193 if(ref==null){throw new RuntimeException("Error - a reference file is required.");} 194 195 //Ensure output files can be written 196 if(!Tools.testOutputFiles(overwrite, append, false, out)){ 197 outstream.println((out==null)+", "+out); 198 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n"); 199 } 200 201 //Ensure input files can be read 202 if(!Tools.testInputFiles(false, true, in, ref)){ 203 throw new RuntimeException("\nCan't read some input files.\n"); 204 } 205 206 //Ensure that no file was specified multiple times 207 if(!Tools.testForDuplicateFiles(true, in, ref, out)){ 208 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 209 } 210 } 211 212 /** Adjust file-related static fields as needed for this program */ checkStatics()213 private static void checkStatics(){ 214 //Adjust the number of threads for input file reading 215 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 216 ByteFile.FORCE_MODE_BF2=true; 217 } 218 219 assert(FastaReadInputStream.settingsOK()); 220 } 221 222 /** Ensure parameter ranges are within bounds and required parameters are set */ validateParams()223 private boolean validateParams(){ 224 // assert(minfoo>0 && minfoo<=maxfoo) : minfoo+", "+maxfoo; 225 assert(false) : "TODO"; 226 return true; 227 } 228 229 /*--------------------------------------------------------------*/ 230 /*---------------- Outer Methods ----------------*/ 231 /*--------------------------------------------------------------*/ 232 233 /** Create read streams and process all data */ process(Timer t)234 void process(Timer t){ 235 236 //Turn off read validation in the input threads to increase speed 237 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; 238 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; 239 240 //Create a read input stream 241 final SamStreamer ss=makeStreamer(ffin); 242 243 //Load reference, if desired (and if present); 244 loadScafMapFromReference(); 245 // loadReferenceCustom(); 246 247 //Optionally create a read output stream 248 final ConcurrentReadOutputStream ros=makeCros(); 249 250 //Reset counters 251 readsProcessed=readsOut=0; 252 basesProcessed=basesOut=0; 253 254 //Process the reads in separate threads 255 spawnThreads(ss, ros); 256 257 if(verbose){outstream.println("Finished; closing streams.");} 258 259 //Write anything that was accumulated by ReadStats 260 errorState|=ReadStats.writeAll(); 261 //Close the read streams 262 errorState|=ReadWrite.closeStream(ros); 263 264 //Reset read validation 265 Read.VALIDATE_IN_CONSTRUCTOR=vic; 266 267 //Report timing and results 268 t.stop(); 269 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 270 outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false)); 271 272 //Throw an exception of there was an error in a thread 273 if(errorState){ 274 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 275 } 276 } 277 278 private void loadScafMapFromReference(){ 279 if(loadedRef){return;} 280 assert(ref!=null); 281 scafMap=ScafMap.loadReference(ref, scafMap, samFilter, true); 282 if(realign){Realigner.map=scafMap;} 283 loadedRef=true; 284 } 285 286 private void loadReferenceCustom(){ 287 ConcurrentReadInputStream cris=makeRefCris(); 288 for(ListNum<Read> ln=cris.nextList(); ln!=null && ln.size()>0; ln=cris.nextList()) { 289 //Do something 290 assert(false) : "TODO"; 291 } 292 } 293 294 private ConcurrentReadInputStream makeRefCris(){ 295 ConcurrentReadInputStream cris=ConcurrentReadInputStream.getReadInputStream(maxReads, true, ffref, null); 296 cris.start(); //Start the stream 297 if(verbose){outstream.println("Started cris");} 298 boolean paired=cris.paired(); 299 assert(!paired) : "References should not be paired."; 300 return cris; 301 } 302 303 private SamStreamer makeStreamer(FileFormat ff){ 304 if(ff==null){return null;} 305 SamStreamer ss=new SamReadStreamer(ff, streamerThreads, true, maxReads); 306 ss.start(); //Start the stream 307 if(verbose){outstream.println("Started Streamer");} 308 return ss; 309 } 310 311 private ConcurrentReadOutputStream makeCros(){ 312 if(ffout==null){return null;} 313 314 //Select output buffer size based on whether it needs to be ordered 315 final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8); 316 317 final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false); 318 ros.start(); //Start the stream 319 return ros; 320 } 321 322 /*--------------------------------------------------------------*/ 323 /*---------------- Thread Management ----------------*/ 324 /*--------------------------------------------------------------*/ 325 326 /** Spawn process threads */ 327 private void spawnThreads(final SamStreamer ss, final ConcurrentReadOutputStream ros){ 328 329 //Do anything necessary prior to processing 330 331 //Determine how many threads may be used 332 final int threads=Shared.threads(); 333 334 //Fill a list with ProcessThreads 335 ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads); 336 for(int i=0; i<threads; i++){ 337 alpt.add(new ProcessThread(ss, ros, i)); 338 } 339 340 //Start the threads 341 for(ProcessThread pt : alpt){ 342 pt.start(); 343 } 344 345 //Wait for threads to finish 346 boolean success=ThreadWaiter.waitForThreads(alpt, this); 347 errorState&=!success; 348 349 //Do anything necessary after processing 350 351 } 352 353 @Override 354 public final void accumulate(ProcessThread pt){ 355 readsProcessed+=pt.readsProcessedT; 356 basesProcessed+=pt.basesProcessedT; 357 readsOut+=pt.readsOutT; 358 basesOut+=pt.basesOutT; 359 errorState|=(!pt.success); 360 } 361 362 @Override 363 public final boolean success(){return !errorState;} 364 365 /*--------------------------------------------------------------*/ 366 /*---------------- Inner Methods ----------------*/ 367 /*--------------------------------------------------------------*/ 368 369 /*--------------------------------------------------------------*/ 370 /*---------------- Inner Classes ----------------*/ 371 /*--------------------------------------------------------------*/ 372 373 /** This class is static to prevent accidental writing to shared variables. 374 * It is safe to remove the static modifier. */ 375 class ProcessThread extends Thread { 376 377 //Constructor 378 ProcessThread(final SamStreamer ss_, final ConcurrentReadOutputStream ros_, final int tid_){ 379 ss=ss_; 380 ros=ros_; 381 tid=tid_; 382 realigner=(realign ? new Realigner() : null); 383 } 384 385 //Called by start() 386 @Override 387 public void run(){ 388 //Do anything necessary prior to processing 389 390 //Process the reads 391 processInner(); 392 393 //Do anything necessary after processing 394 395 //Indicate successful exit status 396 success=true; 397 } 398 399 /** Iterate through the reads */ 400 void processInner(){ 401 402 //Grab and process all lists 403 for(ListNum<Read> ln=ss.nextReads(); ln!=null; ln=ss.nextReads()){ 404 // if(verbose){outstream.println("Got list of size "+list.size());} //Disabled due to non-static access 405 406 processList(ln); 407 } 408 409 } 410 411 void processList(ListNum<Read> ln){ 412 413 //Grab the actual read list from the ListNum 414 final ArrayList<Read> reads=ln.list; 415 416 //Loop through each read in the list 417 for(int idx=0; idx<reads.size(); idx++){ 418 final Read r=reads.get(idx); 419 420 //Validate reads in worker threads 421 if(!r.validated()){r.validate(true);} 422 423 //Track the initial length for statistics 424 final int initialLength=r.length(); 425 426 //Increment counters 427 readsProcessedT+=r.pairCount(); 428 basesProcessedT+=initialLength; 429 430 { 431 //Reads are processed in this block. 432 boolean keep=processRead(r); 433 434 if(!keep){reads.set(idx, null);} 435 else{ 436 readsOutT++; 437 basesOutT+=r.length(); 438 } 439 } 440 } 441 442 //Output reads to the output stream 443 if(ros!=null){ros.add(reads, ln.id);} 444 } 445 446 /** 447 * Process a read or a read pair. 448 * @param r Read 1 449 * @param r2 Read 2 (may be null) 450 * @return True if the reads should be kept, false if they should be discarded. 451 */ 452 boolean processRead(final Read r){ 453 if(r.bases==null || r.length()<=1){return false;} 454 final SamLine sl=r.samline; 455 if(samFilter!=null && !samFilter.passesFilter(sl)){return false;} 456 457 // System.err.println("A: "+sl); 458 459 // final SamLine oldSL=new SamLine(sl); 460 // final Read oldRead=r.clone(); 461 462 assert(false) : "TODO"; 463 return true; 464 } 465 466 /** Number of reads processed by this thread */ 467 protected long readsProcessedT=0; 468 /** Number of bases processed by this thread */ 469 protected long basesProcessedT=0; 470 471 /** Number of reads retained by this thread */ 472 protected long readsOutT=0; 473 /** Number of bases retained by this thread */ 474 protected long basesOutT=0; 475 476 /** True only if this thread has completed successfully */ 477 boolean success=false; 478 479 /** Shared input stream */ 480 private final SamStreamer ss; 481 /** Shared output stream */ 482 private final ConcurrentReadOutputStream ros; 483 /** Thread ID */ 484 final int tid; 485 /** For realigning reads */ 486 final Realigner realigner; 487 } 488 489 /*--------------------------------------------------------------*/ 490 /*---------------- Fields ----------------*/ 491 /*--------------------------------------------------------------*/ 492 493 /** Primary input file path */ 494 private String in=null; 495 /** Secondary input file path */ 496 private String ref=null; 497 498 /** Primary output file path */ 499 private String out=null; 500 501 /** Override input file extension */ 502 private String extin=null; 503 /** Override output file extension */ 504 private String extout=null; 505 506 /*--------------------------------------------------------------*/ 507 508 /** Number of reads processed */ 509 protected long readsProcessed=0; 510 /** Number of bases processed */ 511 protected long basesProcessed=0; 512 513 /** Number of reads retained */ 514 protected long readsOut=0; 515 /** Number of bases retained */ 516 protected long basesOut=0; 517 518 /** Quit after processing this many input reads; -1 means no limit */ 519 private long maxReads=-1; 520 521 /*--------------------------------------------------------------*/ 522 523 /** Threads dedicated to reading the sam file */ 524 private int streamerThreads=SamStreamer.DEFAULT_THREADS; 525 526 private boolean loadedRef=false; 527 528 private boolean realign=false; 529 530 private int ploidy=1; 531 532 public ScafMap scafMap; 533 public final SamFilter samFilter=new SamFilter(); 534 535 /*--------------------------------------------------------------*/ 536 /*---------------- Final Fields ----------------*/ 537 /*--------------------------------------------------------------*/ 538 539 /** Primary input file */ 540 private final FileFormat ffin; 541 /** Secondary input file */ 542 private final FileFormat ffref; 543 544 /** Primary output file */ 545 private final FileFormat ffout; 546 547 /*--------------------------------------------------------------*/ 548 /*---------------- Common Fields ----------------*/ 549 /*--------------------------------------------------------------*/ 550 551 /** Print status messages to this output stream */ 552 private PrintStream outstream=System.err; 553 /** Print verbose messages */ 554 public static boolean verbose=false; 555 /** True if an error was encountered */ 556 public boolean errorState=false; 557 /** Overwrite existing output files */ 558 private boolean overwrite=false; 559 /** Append to existing output files */ 560 private boolean append=false; 561 /** Reads are output in input order */ 562 private boolean ordered=false; 563 564 } 565