1 package sort; 2 3 import java.io.File; 4 import java.io.IOException; 5 import java.io.PrintStream; 6 import java.util.ArrayList; 7 import java.util.Collections; 8 import java.util.Random; 9 import java.util.concurrent.atomic.AtomicLong; 10 11 import fileIO.ByteFile; 12 import fileIO.FileFormat; 13 import fileIO.ReadWrite; 14 import shared.KillSwitch; 15 import shared.Parse; 16 import shared.Parser; 17 import shared.PreParser; 18 import shared.ReadStats; 19 import shared.Shared; 20 import shared.Timer; 21 import shared.Tools; 22 import stream.ConcurrentReadInputStream; 23 import stream.ConcurrentReadOutputStream; 24 import stream.CrisContainer; 25 import stream.FASTQ; 26 import stream.FastaReadInputStream; 27 import stream.Read; 28 import stream.SamLine; 29 import structures.ListNum; 30 31 /** 32 * Sorts reads by name, potentially from multiple input files. 33 * 34 * @author Brian Bushnell 35 * @date September 21, 2016 36 * 37 */ 38 public class Shuffle2 { 39 40 /*--------------------------------------------------------------*/ 41 /*---------------- Initialization ----------------*/ 42 /*--------------------------------------------------------------*/ 43 44 /** 45 * Code entrance from the command line. 46 * @param args Command line arguments 47 */ main(String[] args)48 public static void main(String[] args){ 49 Timer t=new Timer(); 50 final boolean oldFI=FASTQ.FORCE_INTERLEAVED, oldTI=FASTQ.TEST_INTERLEAVED; 51 Shuffle2 x=new Shuffle2(args); 52 x.process(t); 53 FASTQ.FORCE_INTERLEAVED=oldFI; 54 FASTQ.TEST_INTERLEAVED=oldTI; 55 56 //Close the print stream if it was redirected 57 Shared.closeStream(x.outstream); 58 } 59 60 /** 61 * Constructor. 62 * @param args Command line arguments 63 */ Shuffle2(String[] args)64 public Shuffle2(String[] args){ 65 66 {//Preparse block for help, config files, and outstream 67 PreParser pp=new PreParser(args, getClass(), false); 68 args=pp.args; 69 outstream=pp.outstream; 70 } 71 72 boolean setInterleaved=false; //Whether interleaved was explicitly set. 73 74 //Set shared static variables 75 Shared.capBuffers(4); 76 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 77 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 78 FASTQ.TEST_INTERLEAVED=true; //TEST_INTERLEAVED must explicitly be disabled with int=f to sort corrupt files. 79 FASTQ.FORCE_INTERLEAVED=false; 80 81 //Create a parser object 82 Parser parser=new Parser(); 83 boolean ascending=true; 84 85 //Parse each argument 86 for(int i=0; i<args.length; i++){ 87 String arg=args[i]; 88 89 //Break arguments into their constituent parts, in the form of "a=b" 90 String[] split=arg.split("="); 91 String a=split[0].toLowerCase(); 92 String b=split.length>1 ? split[1] : null; 93 94 if(a.equals("verbose")){ 95 verbose=Parse.parseBoolean(b); 96 }else if(a.equals("verbose2")){ 97 assert(false) : "Verbose2 is disabled."; 98 // verbose2=Parse.parseBoolean(b); 99 }else if(a.equals("delete")){ 100 delete=Parse.parseBoolean(b); 101 }else if(a.equals("allowtemp") || a.equals("usetemp")){ 102 allowTempFiles=Parse.parseBoolean(b); 103 }else if(a.equals("memmult") || a.equals("mult")){ 104 memMult=(float) Double.parseDouble(b); 105 }else if(a.equals("ascending")){ 106 ascending=Parse.parseBoolean(b); 107 }else if(a.equals("descending")){ 108 ascending=!Parse.parseBoolean(b); 109 }else if(a.equals("maxfiles") || a.equals("files")){ 110 maxFiles=Integer.parseInt(b); 111 }else if(a.equals("seed")){ 112 seed=Long.parseLong(b); 113 }else if(a.equals("deterministic")){ 114 seed=Parse.parseBoolean(b) ? 1 : -1; 115 }else if(a.equals("parse_flag_goes_here")){ 116 //Set a variable here 117 }else if(parser.parse(arg, a, b)){//Parse standard flags in the parser 118 //do nothing 119 }else{ 120 outstream.println("Unknown parameter "+args[i]); 121 assert(false) : "Unknown parameter "+args[i]; 122 // throw new RuntimeException("Unknown parameter "+args[i]); 123 } 124 } 125 126 randy=Shared.threadLocalRandom(seed); 127 SamLine.SET_FROM_OK=true; 128 129 {//Process parser fields 130 Parser.processQuality(); 131 132 maxReads=parser.maxReads; 133 134 overwrite=ReadStats.overwrite=parser.overwrite; 135 append=ReadStats.append=parser.append; 136 setInterleaved=parser.setInterleaved; 137 138 in1=parser.in1; 139 in2=parser.in2; 140 qfin1=parser.qfin1; 141 qfin2=parser.qfin2; 142 143 out1=parser.out1; 144 out2=parser.out2; 145 146 extin=parser.extin; 147 extout=parser.extout; 148 149 minlen=parser.minReadLength; 150 } 151 152 // assert(false) : setInterleaved+", "+FASTQ.FORCE_INTERLEAVED+", "+FASTQ.TEST_INTERLEAVED; 153 // assert(!FASTQ.FORCE_INTERLEAVED) : setInterleaved+", "+FASTQ.FORCE_INTERLEAVED+", "+FASTQ.TEST_INTERLEAVED; 154 // assert(!FASTQ.TEST_INTERLEAVED) : setInterleaved+", "+FASTQ.FORCE_INTERLEAVED+", "+FASTQ.TEST_INTERLEAVED; 155 156 //Do input file # replacement 157 if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){ 158 in2=in1.replace("#", "2"); 159 in1=in1.replace("#", "1"); 160 } 161 162 //Do output file # replacement 163 if(out1!=null && out2==null && out1.indexOf('#')>-1){ 164 out2=out1.replace("#", "2"); 165 out1=out1.replace("#", "1"); 166 } 167 168 //Adjust interleaved detection based on the number of input files 169 if(in2!=null){ 170 if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");} 171 FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false; 172 } 173 174 assert(FastaReadInputStream.settingsOK()); 175 176 //Ensure there is an input file 177 if(in1==null){throw new RuntimeException("Error - at least one input file is required.");} 178 179 //Adjust the number of threads for input file reading 180 if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){ 181 ByteFile.FORCE_MODE_BF2=true; 182 } 183 184 //Ensure out2 is not set without out1 185 if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");} 186 187 //Adjust interleaved settings based on number of output files 188 if(!setInterleaved){ 189 if(in2==null && out2!=null){ 190 FASTQ.FORCE_INTERLEAVED=true; 191 FASTQ.TEST_INTERLEAVED=false; 192 }else if(in2!=null){ 193 FASTQ.FORCE_INTERLEAVED=false; 194 FASTQ.TEST_INTERLEAVED=false; 195 } 196 } 197 198 //Ensure output files can be written 199 if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){ 200 outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2); 201 throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n"); 202 } 203 204 //Ensure input files can be read 205 if(!Tools.testInputFiles(false, true, in1, in2)){ 206 throw new RuntimeException("\nCan't read some input files.\n"); 207 } 208 209 //Ensure that no file was specified multiple times 210 if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){ 211 throw new RuntimeException("\nSome file names were specified multiple times.\n"); 212 } 213 214 //Create input FileFormat objects 215 ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true); 216 ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true); 217 218 //Create output FileFormat objects 219 ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, false); 220 ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, false); 221 222 tempExt=".fq.gz"; 223 if(extout==null){ 224 if(ffout1!=null){ 225 tempExt=ffout1.fasta() ? ".fa.gz" : ffout1.samOrBam() ? ".sam" : ".fq.gz"; 226 } 227 }else{ 228 tempExt=extout; 229 } 230 231 } 232 233 /*--------------------------------------------------------------*/ 234 /*---------------- Outer Methods ----------------*/ 235 /*--------------------------------------------------------------*/ 236 237 /** Create read streams and process all data */ process(Timer t)238 void process(Timer t){ 239 240 //Create a read input stream 241 final ConcurrentReadInputStream cris; 242 { 243 useSharedHeader=(ffin1.samOrBam() && ffout1!=null && ffout1.samOrBam()); 244 cris=ConcurrentReadInputStream.getReadInputStream(maxReads, useSharedHeader, ffin1, ffin2, qfin1, qfin2); 245 cris.start(); //Start the stream 246 if(verbose){outstream.println("Started cris");} 247 } 248 boolean paired=cris.paired(); 249 if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));} 250 251 // //Optionally create a read output stream 252 // final ConcurrentReadOutputStream ros; 253 // if(ffout1!=null){ 254 // final int buff=4; 255 // 256 // if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){ 257 // outstream.println("Writing interleaved."); 258 // } 259 // 260 // ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false); 261 // ros.start(); //Start the stream 262 // }else{ros=null;} 263 264 //Reset counters 265 readsProcessed=0; 266 basesProcessed=0; 267 268 //Process the read stream 269 processInner(cris); 270 271 if(verbose){outstream.println("Finished; closing streams.");} 272 273 //Write anything that was accumulated by ReadStats 274 errorState|=ReadStats.writeAll(); 275 //Close the read streams 276 errorState|=ReadWrite.closeStreams(cris); 277 278 //Report timing and results 279 t.stop(); 280 outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8)); 281 282 //Throw an exception of there was an error in a thread 283 if(errorState){ 284 throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt."); 285 } 286 } 287 288 /** Iterate through the reads */ processInner(final ConcurrentReadInputStream cris)289 public void processInner(final ConcurrentReadInputStream cris){ 290 //Do anything necessary prior to processing 291 final int ziplevel0=ReadWrite.ZIPLEVEL; 292 ReadWrite.ZIPLEVEL=Tools.mid(1, ReadWrite.ZIPLEVEL, 2); 293 294 ArrayList<Read> storage=new ArrayList<Read>(); 295 296 final long maxMem=Shared.memAvailable(1); 297 final long memLimit=(long)(maxMem*.75); 298 final long currentLimit=(long)(maxMem*memMult); 299 final int readLimit=2000000000; 300 long currentMem=0; 301 long dumped=0; 302 long dumps=0; 303 // IntList dumpCount=new IntList(); 304 AtomicLong outstandingMem=new AtomicLong(); 305 306 if(verbose){outstream.println("maxMem="+maxMem+", memLimit="+memLimit+", currentLimit="+currentLimit+", currentLimit="+currentLimit);} 307 308 { 309 //Grab the first ListNum of reads 310 ListNum<Read> ln=cris.nextList(); 311 //Grab the actual read list from the ListNum 312 ArrayList<Read> reads=(ln!=null ? ln.list : null); 313 314 //Check to ensure pairing is as expected 315 if(reads!=null && !reads.isEmpty()){ 316 Read r=reads.get(0); 317 assert((ffin1==null || ffin1.samOrBam()) || (r.mate!=null)==cris.paired()); 318 } 319 320 //As long as there is a nonempty read list... 321 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 322 if(verbose2){outstream.println("Fetched "+reads.size()+" reads.");} 323 324 //Loop through each read in the list 325 for(int idx=0; idx<reads.size(); idx++){ 326 final Read r1=reads.get(idx); 327 final Read r2=r1.mate; 328 329 //Track the initial length for statistics 330 final int initialLength1=r1.length(); 331 final int initialLength2=(r1.mateLength()); 332 333 //Increment counters 334 readsProcessed+=r1.pairCount(); 335 basesProcessed+=initialLength1+initialLength2; 336 maxLengthObserved=Tools.max(maxLengthObserved, initialLength1, initialLength2); 337 338 if(minlen<1 || initialLength1>=minlen || initialLength2>=minlen){ 339 currentMem+=r1.countBytes()+(r2==null ? 0 : r2.countBytes()); 340 storage.add(r1); 341 } 342 } 343 344 if(allowTempFiles && (currentMem>=currentLimit || storage.size()>=readLimit)){ 345 if(verbose){outstream.println("currentMem: "+currentMem+" >= "+currentLimit+", dumping. ");} 346 outstandingMem.addAndGet(currentMem); 347 // dumpCount.add(storage.size()); 348 shuffleAndDump(storage, currentMem, outstandingMem, null, false); 349 storage=new ArrayList<Read>(); 350 dumped+=currentMem; 351 dumps++; 352 currentMem=0; 353 if(verbose){outstream.println("Waiting on memory; outstandingMem="+outstandingMem);} 354 waitOnMemory(outstandingMem, memLimit); 355 if(verbose){outstream.println("Done waiting; outstandingMem="+outstandingMem);} 356 } 357 358 //Notify the input stream that the list was used 359 cris.returnList(ln); 360 // if(verbose){outstream.println("Returned a list.");} 361 362 //Fetch a new list 363 ln=cris.nextList(); 364 reads=(ln!=null ? ln.list : null); 365 } 366 367 //Notify the input stream that the final list was used 368 if(ln!=null){ 369 cris.returnList(ln.id, ln.list==null || ln.list.isEmpty()); 370 } 371 } 372 373 outstream.println("Finished reading input."); 374 375 outstandingMem.addAndGet(currentMem); 376 if(dumps==0){ 377 ReadWrite.ZIPLEVEL=ziplevel0; 378 outstream.println("Sorting."); 379 if(out1!=null){ 380 shuffleAndDump(storage, currentMem, outstandingMem, out1, useSharedHeader); 381 storage=null; 382 waitOnMemory(outstandingMem, 0); 383 } 384 }else{ 385 // dumpCount.add(storage.size()); 386 shuffleAndDump(storage, currentMem, outstandingMem, null, false); 387 storage=null; 388 waitOnMemory(outstandingMem, 0); 389 outstream.println("Merging "+(dumps+1)+" files."); 390 ReadWrite.ZIPLEVEL=ziplevel0; 391 if(maxLengthObserved*(dumped+1)>200000000L){ 392 outstream.println("Reduced buffer sizes prior to merging due to low memory."); 393 Shared.capBufferLen(4); 394 Shared.capBuffers(1); 395 } 396 mergeAndDump(outTemp, /*dumpCount, */useSharedHeader); 397 } 398 399 } 400 waitOnMemory(AtomicLong outstandingMem, long target)401 private void waitOnMemory(AtomicLong outstandingMem, long target){ 402 if(outstandingMem.get()>target){ 403 if(verbose){outstream.println("Syncing; outstandingMem="+outstandingMem);} 404 while(outstandingMem.get()>target){ 405 try { 406 synchronized(outstandingMem){ 407 outstandingMem.wait(2000); 408 } 409 } catch (InterruptedException e) { 410 // TODO Auto-generated catch block 411 e.printStackTrace(); 412 } 413 } 414 } 415 } 416 417 /*--------------------------------------------------------------*/ 418 /*---------------- Inner Methods ----------------*/ 419 /*--------------------------------------------------------------*/ 420 mergeRecursive(final ArrayList<String> inList)421 private ArrayList<String> mergeRecursive(final ArrayList<String> inList){ 422 assert(maxFiles>1); 423 ArrayList<String> currentList=inList; 424 final int oldZL=ReadWrite.ZIPLEVEL; 425 while(currentList.size()>maxFiles){ 426 ReadWrite.ZIPLEVEL=Tools.min(ReadWrite.ZIPLEVEL, 4); 427 final int size=currentList.size(); 428 final int groups=(size+maxFiles-1)/maxFiles; 429 assert(groups>0 && groups<size); 430 ArrayList<ArrayList<String>> listList=new ArrayList<ArrayList<String>>(); 431 ArrayList<String> outList=new ArrayList<String>(); 432 for(int i=0; i<groups; i++){ 433 listList.add(new ArrayList<String>()); 434 } 435 for(int i=0; i<size; i++){ 436 listList.get(i%groups).add(currentList.get(i)); 437 } 438 for(ArrayList<String> subList : listList){ 439 String temp=getTempFile(); 440 FileFormat ff=FileFormat.testOutput(temp, FileFormat.FASTQ, null, true, false, false, false); 441 merge(subList, ff, null); 442 outList.add(temp); 443 } 444 currentList=outList; 445 } 446 ReadWrite.ZIPLEVEL=oldZL; 447 return currentList; 448 } 449 merge(ArrayList<String> inList, FileFormat ff1, FileFormat ff2)450 public void merge(ArrayList<String> inList, FileFormat ff1, FileFormat ff2){ 451 final int oldBuffers=Shared.numBuffers(); 452 final int oldBufferLen=Shared.bufferLen(); 453 if(inList.size()>4){ 454 outstream.println("Reduced buffer sizes prior to merging."); 455 Shared.capBufferLen(4); 456 Shared.capBuffers(1); 457 } 458 459 errorState|=mergeAndDump(inList, /*null, */ff1, ff2, delete, useSharedHeader, outstream); 460 Shared.setBufferLen(oldBufferLen); 461 Shared.setBuffers(oldBuffers); 462 } 463 getTempFile()464 private String getTempFile(){ 465 String temp; 466 File dir=new File(".");//(Shared.tmpdir()==null ? null : new File(Shared.tmpdir())); 467 if(dir!=null && !dir.exists()){dir.mkdirs();} 468 try { 469 temp=File.createTempFile("sort_temp_", tempExt, dir).toString(); 470 } catch (IOException e) { 471 // TODO Auto-generated catch block 472 e.printStackTrace(); 473 KillSwitch.kill(e.getMessage()); 474 return null; 475 } 476 return temp; 477 } 478 mergeAndDump(ArrayList<String> fnames, boolean useHeader)479 private boolean mergeAndDump(ArrayList<String> fnames, /*IntList dumpCount, */boolean useHeader) { 480 if(fnames.size()*maxLengthObserved>2000000000 || fnames.size()>64){ 481 outstream.println("Performing recursive merge to reduce open files."); 482 fnames=mergeRecursive(fnames); 483 } 484 return mergeAndDump(fnames, /*dumpCount,*/ ffout1, ffout2, delete, useHeader, outstream); 485 } 486 mergeAndDump(ArrayList<String> fnames, FileFormat ffout1, FileFormat ffout2, boolean delete, boolean useHeader, PrintStream outstream)487 public boolean mergeAndDump(ArrayList<String> fnames, /*IntList dumpCount, */FileFormat ffout1, FileFormat ffout2, boolean delete, boolean useHeader, PrintStream outstream) { 488 489 final int oldBuffers=Shared.numBuffers(); 490 final int oldBufferLen=Shared.bufferLen(); 491 492 if(fnames.size()>4){ 493 Shared.capBufferLen(8); 494 Shared.setBuffers(1); 495 } 496 497 System.err.println("Merging "+fnames); 498 // outstream.println("zl="+ReadWrite.ZIPLEVEL); 499 // outstream.println("ztd="+ReadWrite.ZIP_THREAD_DIVISOR); 500 // outstream.println("mzt="+ReadWrite.MAX_ZIP_THREADS); 501 // outstream.println("pigz="+ReadWrite.USE_PIGZ); 502 503 ListNum.setDeterministicRandom(false); 504 boolean errorState=false; 505 final ConcurrentReadOutputStream ros; 506 if(ffout1!=null){ 507 final int buff=1; 508 ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, null, null, buff, null, useHeader); 509 ros.start(); //Start the stream 510 }else{ros=null;} 511 512 ArrayList<CrisContainer> cclist=new ArrayList<CrisContainer>(fnames.size()); 513 for(int i=0; i<fnames.size(); i++){ 514 String fname=fnames.get(i); 515 // int size=(dumpCount==null ? -1 : dumpCount.get(i)); 516 CrisContainer cc=new CrisContainer(fname, null, false); 517 if(cc.peek()!=null){ 518 cclist.add(cc); 519 } 520 } 521 ArrayList<CrisContainer> cclist2=(ArrayList<CrisContainer>) cclist.clone(); 522 523 //TODO: Use the read counts, stored in dumpCount, to approximately restore random values for shuffling. 524 mergeAndDump(cclist, ros, outstream); 525 if(verbose){ 526 outstream.println("Finished processing "+fnames); 527 } 528 529 for(CrisContainer cc : cclist2){ 530 errorState|=cc.close(); 531 } 532 if(delete){ 533 for(String fname : fnames){ 534 new File(fname).delete(); 535 } 536 } 537 if(ros!=null){errorState|=ReadWrite.closeStream(ros);} 538 539 Shared.setBufferLen(oldBufferLen); 540 Shared.setBuffers(oldBuffers); 541 542 return errorState; 543 } 544 mergeAndDump(final ArrayList<CrisContainer> q, final ConcurrentReadOutputStream ros, PrintStream outstream)545 private static void mergeAndDump(final ArrayList<CrisContainer> q, final ConcurrentReadOutputStream ros, PrintStream outstream) { 546 547 for(CrisContainer cc : q){ 548 assert(!cc.cris().paired()) : FASTQ.TEST_INTERLEAVED+", "+FASTQ.FORCE_INTERLEAVED; 549 } 550 551 final int limit=20000; 552 ArrayList<Read> buffer=new ArrayList<Read>(2*limit); 553 while(!q.isEmpty()){ 554 if(verbose2){outstream.println("q size: "+q.size());} 555 for(int i=0; !q.isEmpty() && (buffer.size()<limit || i<1); i++){//For loop to force it to run at least once 556 int num=randy.nextInt(q.size()); 557 CrisContainer cc=q.get(num); 558 if(verbose2){outstream.println("Polled a cc.");} 559 ArrayList<Read> list=cc.fetch(); 560 if(verbose2){outstream.println("Grabbed "+list.size()+" reads.");} 561 if(list==null){ 562 q.remove(num); 563 }else{ 564 buffer.addAll(list); 565 } 566 if(verbose2){outstream.println("Buffer size: "+buffer.size());} 567 } 568 Collections.shuffle(buffer); 569 if(verbose2){outstream.println("Shuffled buffer.");} 570 571 ArrayList<Read> list=new ArrayList<Read>(buffer.size()); 572 list.addAll(buffer); 573 if(ros!=null){ros.add(list, 0);} 574 575 buffer.clear(); 576 } 577 578 assert(buffer.isEmpty()); 579 } 580 shuffleAndDump(final ArrayList<Read> storage, final long currentMem, final AtomicLong outstandingMem, String fname, boolean useHeader)581 private void shuffleAndDump(final ArrayList<Read> storage, final long currentMem, final AtomicLong outstandingMem, String fname, boolean useHeader) { 582 String temp=fname; 583 if(temp==null){ 584 synchronized(outTemp){ 585 if(verbose2){outstream.println("Synced to outTemp to make a temp file.");} 586 temp=getTempFile(); 587 if(verbose2){outstream.println("Temp file: "+temp);} 588 outTemp.add(temp); 589 } 590 } 591 592 if(verbose || true){outstream.println("Created a WriteThread for "+temp);} 593 WriteThread wt=new WriteThread(storage, currentMem, outstandingMem, temp, useHeader, outstream); 594 wt.start(); 595 } 596 597 /*--------------------------------------------------------------*/ 598 /*---------------- Inner Classes ----------------*/ 599 /*--------------------------------------------------------------*/ 600 601 private static class WriteThread extends Thread{ 602 WriteThread(final ArrayList<Read> storage_, final long currentMem_, final AtomicLong outstandingMem_, String fname_, boolean useHeader_, PrintStream outstream_)603 public WriteThread(final ArrayList<Read> storage_, final long currentMem_, final AtomicLong outstandingMem_, String fname_, boolean useHeader_, PrintStream outstream_){ 604 storage=storage_; 605 currentMem=currentMem_; 606 outstandingMem=outstandingMem_; 607 fname=fname_; 608 useHeader=useHeader_; 609 outstream=outstream_; 610 } 611 612 @Override run()613 public void run(){ 614 615 if(verbose){outstream.println("Started a WriteThread.");} 616 final FileFormat ffout=FileFormat.testOutput(fname, FileFormat.FASTQ, null, true, false, false, false); 617 final ConcurrentReadOutputStream ros; 618 if(ffout!=null){ 619 final int buff=4; 620 ros=ConcurrentReadOutputStream.getStream(ffout, null, null, null, buff, null, useHeader); 621 ros.start(); //Start the stream 622 }else{ros=null;} 623 624 if(verbose){outstream.println("Started a ros.");} 625 Collections.shuffle(storage); 626 627 if(verbose){outstream.println("Sorted reads.");} 628 629 ArrayList<Read> buffer=new ArrayList<Read>(200); 630 long id=0; 631 for(int i=0, lim=storage.size(); i<lim; i++){ 632 Read r=storage.set(i, null); 633 buffer.add(r); 634 if(buffer.size()>=200){ 635 if(ros!=null){ros.add(buffer, id);} 636 id++; 637 buffer=new ArrayList<Read>(200); 638 } 639 } 640 if(ros!=null && buffer.size()>0){ros.add(buffer, id);} 641 errorState|=ReadWrite.closeStream(ros); 642 if(verbose){outstream.println("Closed ros.");} 643 644 synchronized(outstandingMem){ 645 outstandingMem.addAndGet(-currentMem); 646 if(verbose){outstream.println("Decremented outstandingMem: "+outstandingMem);} 647 outstandingMem.notify(); 648 if(verbose){outstream.println("Notified outstandingMem.");} 649 } 650 } 651 652 final ArrayList<Read> storage; 653 final long currentMem; 654 final AtomicLong outstandingMem; 655 final String fname; 656 final boolean useHeader; 657 boolean errorState=false; 658 final PrintStream outstream; 659 660 } 661 662 /*--------------------------------------------------------------*/ 663 /*---------------- Fields ----------------*/ 664 /*--------------------------------------------------------------*/ 665 666 /** Primary input file path */ 667 private String in1=null; 668 /** Secondary input file path */ 669 private String in2=null; 670 671 private String qfin1=null; 672 private String qfin2=null; 673 674 /** Primary output file path */ 675 private String out1=null; 676 /** Secondary output file path */ 677 private String out2=null; 678 679 private ArrayList<String> outTemp=new ArrayList<String>(); 680 681 /** Override input file extension */ 682 private String extin=null; 683 /** Override output file extension */ 684 private String extout=null; 685 686 private String tempExt=null; 687 688 /*--------------------------------------------------------------*/ 689 690 long maxLengthObserved=0; 691 692 /** Number of reads processed */ 693 protected long readsProcessed=0; 694 /** Number of bases processed */ 695 protected long basesProcessed=0; 696 697 /** Quit after processing this many input reads; -1 means no limit */ 698 private long maxReads=-1; 699 700 private boolean delete=true; 701 702 private boolean useSharedHeader=false; 703 704 private boolean allowTempFiles=true; 705 706 private int minlen=0; 707 708 private float memMult=0.35f; 709 710 /** Max files to merge per pass */ 711 private int maxFiles=16; 712 713 private long seed=-1; 714 715 static Random randy=Shared.threadLocalRandom(); 716 717 /*--------------------------------------------------------------*/ 718 /*---------------- Final Fields ----------------*/ 719 /*--------------------------------------------------------------*/ 720 721 /** Primary input file */ 722 private final FileFormat ffin1; 723 /** Secondary input file */ 724 private final FileFormat ffin2; 725 726 /** Primary output file */ 727 private final FileFormat ffout1; 728 /** Secondary output file */ 729 private final FileFormat ffout2; 730 731 /*--------------------------------------------------------------*/ 732 /*---------------- Common Fields ----------------*/ 733 /*--------------------------------------------------------------*/ 734 735 /** Print status messages to this output stream */ 736 private PrintStream outstream=System.err; 737 /** Print verbose messages */ 738 public static boolean verbose=false; 739 /** Print verbose messages */ 740 public static final boolean verbose2=false; 741 /** True if an error was encountered */ 742 public boolean errorState=false; 743 /** Overwrite existing output files */ 744 private boolean overwrite=false; 745 /** Append to existing output files */ 746 private boolean append=false; 747 /** This flag has no effect on singlethreaded programs */ 748 private final boolean ordered=false; 749 750 } 751