1 package sort;
2 
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.PrintStream;
6 import java.util.ArrayList;
7 import java.util.Collections;
8 import java.util.Random;
9 import java.util.concurrent.atomic.AtomicLong;
10 
11 import fileIO.ByteFile;
12 import fileIO.FileFormat;
13 import fileIO.ReadWrite;
14 import shared.KillSwitch;
15 import shared.Parse;
16 import shared.Parser;
17 import shared.PreParser;
18 import shared.ReadStats;
19 import shared.Shared;
20 import shared.Timer;
21 import shared.Tools;
22 import stream.ConcurrentReadInputStream;
23 import stream.ConcurrentReadOutputStream;
24 import stream.CrisContainer;
25 import stream.FASTQ;
26 import stream.FastaReadInputStream;
27 import stream.Read;
28 import stream.SamLine;
29 import structures.ListNum;
30 
31 /**
32  * Sorts reads by name, potentially from multiple input files.
33  *
34  * @author Brian Bushnell
35  * @date September 21, 2016
36  *
37  */
38 public class Shuffle2 {
39 
40 	/*--------------------------------------------------------------*/
41 	/*----------------        Initialization        ----------------*/
42 	/*--------------------------------------------------------------*/
43 
44 	/**
45 	 * Code entrance from the command line.
46 	 * @param args Command line arguments
47 	 */
main(String[] args)48 	public static void main(String[] args){
49 		Timer t=new Timer();
50 		final boolean oldFI=FASTQ.FORCE_INTERLEAVED, oldTI=FASTQ.TEST_INTERLEAVED;
51 		Shuffle2 x=new Shuffle2(args);
52 		x.process(t);
53 		FASTQ.FORCE_INTERLEAVED=oldFI;
54 		FASTQ.TEST_INTERLEAVED=oldTI;
55 
56 		//Close the print stream if it was redirected
57 		Shared.closeStream(x.outstream);
58 	}
59 
60 	/**
61 	 * Constructor.
62 	 * @param args Command line arguments
63 	 */
Shuffle2(String[] args)64 	public Shuffle2(String[] args){
65 
66 		{//Preparse block for help, config files, and outstream
67 			PreParser pp=new PreParser(args, getClass(), false);
68 			args=pp.args;
69 			outstream=pp.outstream;
70 		}
71 
72 		boolean setInterleaved=false; //Whether interleaved was explicitly set.
73 
74 		//Set shared static variables
75 		Shared.capBuffers(4);
76 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
77 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
78 		FASTQ.TEST_INTERLEAVED=true; //TEST_INTERLEAVED must explicitly be disabled with int=f to sort corrupt files.
79 		FASTQ.FORCE_INTERLEAVED=false;
80 
81 		//Create a parser object
82 		Parser parser=new Parser();
83 		boolean ascending=true;
84 
85 		//Parse each argument
86 		for(int i=0; i<args.length; i++){
87 			String arg=args[i];
88 
89 			//Break arguments into their constituent parts, in the form of "a=b"
90 			String[] split=arg.split("=");
91 			String a=split[0].toLowerCase();
92 			String b=split.length>1 ? split[1] : null;
93 
94 			if(a.equals("verbose")){
95 				verbose=Parse.parseBoolean(b);
96 			}else if(a.equals("verbose2")){
97 				assert(false) : "Verbose2 is disabled.";
98 //				verbose2=Parse.parseBoolean(b);
99 			}else if(a.equals("delete")){
100 				delete=Parse.parseBoolean(b);
101 			}else if(a.equals("allowtemp") || a.equals("usetemp")){
102 				allowTempFiles=Parse.parseBoolean(b);
103 			}else if(a.equals("memmult") || a.equals("mult")){
104 				memMult=(float) Double.parseDouble(b);
105 			}else if(a.equals("ascending")){
106 				ascending=Parse.parseBoolean(b);
107 			}else if(a.equals("descending")){
108 				ascending=!Parse.parseBoolean(b);
109 			}else if(a.equals("maxfiles") || a.equals("files")){
110 				maxFiles=Integer.parseInt(b);
111 			}else if(a.equals("seed")){
112 				seed=Long.parseLong(b);
113 			}else if(a.equals("deterministic")){
114 				seed=Parse.parseBoolean(b) ? 1 : -1;
115 			}else if(a.equals("parse_flag_goes_here")){
116 				//Set a variable here
117 			}else if(parser.parse(arg, a, b)){//Parse standard flags in the parser
118 				//do nothing
119 			}else{
120 				outstream.println("Unknown parameter "+args[i]);
121 				assert(false) : "Unknown parameter "+args[i];
122 				//				throw new RuntimeException("Unknown parameter "+args[i]);
123 			}
124 		}
125 
126 		randy=Shared.threadLocalRandom(seed);
127 		SamLine.SET_FROM_OK=true;
128 
129 		{//Process parser fields
130 			Parser.processQuality();
131 
132 			maxReads=parser.maxReads;
133 
134 			overwrite=ReadStats.overwrite=parser.overwrite;
135 			append=ReadStats.append=parser.append;
136 			setInterleaved=parser.setInterleaved;
137 
138 			in1=parser.in1;
139 			in2=parser.in2;
140 			qfin1=parser.qfin1;
141 			qfin2=parser.qfin2;
142 
143 			out1=parser.out1;
144 			out2=parser.out2;
145 
146 			extin=parser.extin;
147 			extout=parser.extout;
148 
149 			minlen=parser.minReadLength;
150 		}
151 
152 //		assert(false) : setInterleaved+", "+FASTQ.FORCE_INTERLEAVED+", "+FASTQ.TEST_INTERLEAVED;
153 //		assert(!FASTQ.FORCE_INTERLEAVED) : setInterleaved+", "+FASTQ.FORCE_INTERLEAVED+", "+FASTQ.TEST_INTERLEAVED;
154 //		assert(!FASTQ.TEST_INTERLEAVED) : setInterleaved+", "+FASTQ.FORCE_INTERLEAVED+", "+FASTQ.TEST_INTERLEAVED;
155 
156 		//Do input file # replacement
157 		if(in1!=null && in2==null && in1.indexOf('#')>-1 && !new File(in1).exists()){
158 			in2=in1.replace("#", "2");
159 			in1=in1.replace("#", "1");
160 		}
161 
162 		//Do output file # replacement
163 		if(out1!=null && out2==null && out1.indexOf('#')>-1){
164 			out2=out1.replace("#", "2");
165 			out1=out1.replace("#", "1");
166 		}
167 
168 		//Adjust interleaved detection based on the number of input files
169 		if(in2!=null){
170 			if(FASTQ.FORCE_INTERLEAVED){outstream.println("Reset INTERLEAVED to false because paired input files were specified.");}
171 			FASTQ.FORCE_INTERLEAVED=FASTQ.TEST_INTERLEAVED=false;
172 		}
173 
174 		assert(FastaReadInputStream.settingsOK());
175 
176 		//Ensure there is an input file
177 		if(in1==null){throw new RuntimeException("Error - at least one input file is required.");}
178 
179 		//Adjust the number of threads for input file reading
180 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
181 			ByteFile.FORCE_MODE_BF2=true;
182 		}
183 
184 		//Ensure out2 is not set without out1
185 		if(out1==null && out2!=null){throw new RuntimeException("Error - cannot define out2 without defining out1.");}
186 
187 		//Adjust interleaved settings based on number of output files
188 		if(!setInterleaved){
189 			if(in2==null && out2!=null){
190 				FASTQ.FORCE_INTERLEAVED=true;
191 				FASTQ.TEST_INTERLEAVED=false;
192 			}else if(in2!=null){
193 				FASTQ.FORCE_INTERLEAVED=false;
194 				FASTQ.TEST_INTERLEAVED=false;
195 			}
196 		}
197 
198 		//Ensure output files can be written
199 		if(!Tools.testOutputFiles(overwrite, append, false, out1, out2)){
200 			outstream.println((out1==null)+", "+(out2==null)+", "+out1+", "+out2);
201 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output files "+out1+", "+out2+"\n");
202 		}
203 
204 		//Ensure input files can be read
205 		if(!Tools.testInputFiles(false, true, in1, in2)){
206 			throw new RuntimeException("\nCan't read some input files.\n");
207 		}
208 
209 		//Ensure that no file was specified multiple times
210 		if(!Tools.testForDuplicateFiles(true, in1, in2, out1, out2)){
211 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
212 		}
213 
214 		//Create input FileFormat objects
215 		ffin1=FileFormat.testInput(in1, FileFormat.FASTQ, extin, true, true);
216 		ffin2=FileFormat.testInput(in2, FileFormat.FASTQ, extin, true, true);
217 
218 		//Create output FileFormat objects
219 		ffout1=FileFormat.testOutput(out1, FileFormat.FASTQ, extout, true, overwrite, append, false);
220 		ffout2=FileFormat.testOutput(out2, FileFormat.FASTQ, extout, true, overwrite, append, false);
221 
222 		tempExt=".fq.gz";
223 		if(extout==null){
224 			if(ffout1!=null){
225 				tempExt=ffout1.fasta() ? ".fa.gz" : ffout1.samOrBam() ? ".sam" : ".fq.gz";
226 			}
227 		}else{
228 			tempExt=extout;
229 		}
230 
231 	}
232 
233 	/*--------------------------------------------------------------*/
234 	/*----------------         Outer Methods        ----------------*/
235 	/*--------------------------------------------------------------*/
236 
237 	/** Create read streams and process all data */
process(Timer t)238 	void process(Timer t){
239 
240 		//Create a read input stream
241 		final ConcurrentReadInputStream cris;
242 		{
243 			useSharedHeader=(ffin1.samOrBam() && ffout1!=null && ffout1.samOrBam());
244 			cris=ConcurrentReadInputStream.getReadInputStream(maxReads, useSharedHeader, ffin1, ffin2, qfin1, qfin2);
245 			cris.start(); //Start the stream
246 			if(verbose){outstream.println("Started cris");}
247 		}
248 		boolean paired=cris.paired();
249 		if(!ffin1.samOrBam()){outstream.println("Input is being processed as "+(paired ? "paired" : "unpaired"));}
250 
251 //		//Optionally create a read output stream
252 //		final ConcurrentReadOutputStream ros;
253 //		if(ffout1!=null){
254 //			final int buff=4;
255 //
256 //			if(cris.paired() && out2==null && (in1!=null && !ffin1.samOrBam() && !ffout1.samOrBam())){
257 //				outstream.println("Writing interleaved.");
258 //			}
259 //
260 //			ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, qfout1, qfout2, buff, null, false);
261 //			ros.start(); //Start the stream
262 //		}else{ros=null;}
263 
264 		//Reset counters
265 		readsProcessed=0;
266 		basesProcessed=0;
267 
268 		//Process the read stream
269 		processInner(cris);
270 
271 		if(verbose){outstream.println("Finished; closing streams.");}
272 
273 		//Write anything that was accumulated by ReadStats
274 		errorState|=ReadStats.writeAll();
275 		//Close the read streams
276 		errorState|=ReadWrite.closeStreams(cris);
277 
278 		//Report timing and results
279 		t.stop();
280 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
281 
282 		//Throw an exception of there was an error in a thread
283 		if(errorState){
284 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
285 		}
286 	}
287 
288 	/** Iterate through the reads */
processInner(final ConcurrentReadInputStream cris)289 	public void processInner(final ConcurrentReadInputStream cris){
290 		//Do anything necessary prior to processing
291 		final int ziplevel0=ReadWrite.ZIPLEVEL;
292 		ReadWrite.ZIPLEVEL=Tools.mid(1, ReadWrite.ZIPLEVEL, 2);
293 
294 		ArrayList<Read> storage=new ArrayList<Read>();
295 
296 		final long maxMem=Shared.memAvailable(1);
297 		final long memLimit=(long)(maxMem*.75);
298 		final long currentLimit=(long)(maxMem*memMult);
299 		final int readLimit=2000000000;
300 		long currentMem=0;
301 		long dumped=0;
302 		long dumps=0;
303 //		IntList dumpCount=new IntList();
304 		AtomicLong outstandingMem=new AtomicLong();
305 
306 		if(verbose){outstream.println("maxMem="+maxMem+", memLimit="+memLimit+", currentLimit="+currentLimit+", currentLimit="+currentLimit);}
307 
308 		{
309 			//Grab the first ListNum of reads
310 			ListNum<Read> ln=cris.nextList();
311 			//Grab the actual read list from the ListNum
312 			ArrayList<Read> reads=(ln!=null ? ln.list : null);
313 
314 			//Check to ensure pairing is as expected
315 			if(reads!=null && !reads.isEmpty()){
316 				Read r=reads.get(0);
317 				assert((ffin1==null || ffin1.samOrBam()) || (r.mate!=null)==cris.paired());
318 			}
319 
320 			//As long as there is a nonempty read list...
321 			while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
322 				if(verbose2){outstream.println("Fetched "+reads.size()+" reads.");}
323 
324 				//Loop through each read in the list
325 				for(int idx=0; idx<reads.size(); idx++){
326 					final Read r1=reads.get(idx);
327 					final Read r2=r1.mate;
328 
329 					//Track the initial length for statistics
330 					final int initialLength1=r1.length();
331 					final int initialLength2=(r1.mateLength());
332 
333 					//Increment counters
334 					readsProcessed+=r1.pairCount();
335 					basesProcessed+=initialLength1+initialLength2;
336 					maxLengthObserved=Tools.max(maxLengthObserved, initialLength1, initialLength2);
337 
338 					if(minlen<1 || initialLength1>=minlen || initialLength2>=minlen){
339 						currentMem+=r1.countBytes()+(r2==null ? 0 : r2.countBytes());
340 						storage.add(r1);
341 					}
342 				}
343 
344 				if(allowTempFiles && (currentMem>=currentLimit || storage.size()>=readLimit)){
345 					if(verbose){outstream.println("currentMem: "+currentMem+" >= "+currentLimit+", dumping. ");}
346 					outstandingMem.addAndGet(currentMem);
347 //					dumpCount.add(storage.size());
348 					shuffleAndDump(storage, currentMem, outstandingMem, null, false);
349 					storage=new ArrayList<Read>();
350 					dumped+=currentMem;
351 					dumps++;
352 					currentMem=0;
353 					if(verbose){outstream.println("Waiting on memory; outstandingMem="+outstandingMem);}
354 					waitOnMemory(outstandingMem, memLimit);
355 					if(verbose){outstream.println("Done waiting; outstandingMem="+outstandingMem);}
356 				}
357 
358 				//Notify the input stream that the list was used
359 				cris.returnList(ln);
360 //				if(verbose){outstream.println("Returned a list.");}
361 
362 				//Fetch a new list
363 				ln=cris.nextList();
364 				reads=(ln!=null ? ln.list : null);
365 			}
366 
367 			//Notify the input stream that the final list was used
368 			if(ln!=null){
369 				cris.returnList(ln.id, ln.list==null || ln.list.isEmpty());
370 			}
371 		}
372 
373 		outstream.println("Finished reading input.");
374 
375 		outstandingMem.addAndGet(currentMem);
376 		if(dumps==0){
377 			ReadWrite.ZIPLEVEL=ziplevel0;
378 			outstream.println("Sorting.");
379 			if(out1!=null){
380 				shuffleAndDump(storage, currentMem, outstandingMem, out1, useSharedHeader);
381 				storage=null;
382 				waitOnMemory(outstandingMem, 0);
383 			}
384 		}else{
385 //			dumpCount.add(storage.size());
386 			shuffleAndDump(storage, currentMem, outstandingMem, null, false);
387 			storage=null;
388 			waitOnMemory(outstandingMem, 0);
389 			outstream.println("Merging "+(dumps+1)+" files.");
390 			ReadWrite.ZIPLEVEL=ziplevel0;
391 			if(maxLengthObserved*(dumped+1)>200000000L){
392 				outstream.println("Reduced buffer sizes prior to merging due to low memory.");
393 				Shared.capBufferLen(4);
394 				Shared.capBuffers(1);
395 			}
396 			mergeAndDump(outTemp, /*dumpCount, */useSharedHeader);
397 		}
398 
399 	}
400 
waitOnMemory(AtomicLong outstandingMem, long target)401 	private void waitOnMemory(AtomicLong outstandingMem, long target){
402 		if(outstandingMem.get()>target){
403 			if(verbose){outstream.println("Syncing; outstandingMem="+outstandingMem);}
404 			while(outstandingMem.get()>target){
405 				try {
406 					synchronized(outstandingMem){
407 						outstandingMem.wait(2000);
408 					}
409 				} catch (InterruptedException e) {
410 					// TODO Auto-generated catch block
411 					e.printStackTrace();
412 				}
413 			}
414 		}
415 	}
416 
417 	/*--------------------------------------------------------------*/
418 	/*----------------         Inner Methods        ----------------*/
419 	/*--------------------------------------------------------------*/
420 
mergeRecursive(final ArrayList<String> inList)421 	private ArrayList<String> mergeRecursive(final ArrayList<String> inList){
422 		assert(maxFiles>1);
423 		ArrayList<String> currentList=inList;
424 		final int oldZL=ReadWrite.ZIPLEVEL;
425 		while(currentList.size()>maxFiles){
426 			ReadWrite.ZIPLEVEL=Tools.min(ReadWrite.ZIPLEVEL, 4);
427 			final int size=currentList.size();
428 			final int groups=(size+maxFiles-1)/maxFiles;
429 			assert(groups>0 && groups<size);
430 			ArrayList<ArrayList<String>> listList=new ArrayList<ArrayList<String>>();
431 			ArrayList<String> outList=new ArrayList<String>();
432 			for(int i=0; i<groups; i++){
433 				listList.add(new ArrayList<String>());
434 			}
435 			for(int i=0; i<size; i++){
436 				listList.get(i%groups).add(currentList.get(i));
437 			}
438 			for(ArrayList<String> subList : listList){
439 				String temp=getTempFile();
440 				FileFormat ff=FileFormat.testOutput(temp, FileFormat.FASTQ, null, true, false, false, false);
441 				merge(subList, ff, null);
442 				outList.add(temp);
443 			}
444 			currentList=outList;
445 		}
446 		ReadWrite.ZIPLEVEL=oldZL;
447 		return currentList;
448 	}
449 
merge(ArrayList<String> inList, FileFormat ff1, FileFormat ff2)450 	public void merge(ArrayList<String> inList, FileFormat ff1, FileFormat ff2){
451 		final int oldBuffers=Shared.numBuffers();
452 		final int oldBufferLen=Shared.bufferLen();
453 		if(inList.size()>4){
454 			outstream.println("Reduced buffer sizes prior to merging.");
455 			Shared.capBufferLen(4);
456 			Shared.capBuffers(1);
457 		}
458 
459 		errorState|=mergeAndDump(inList, /*null, */ff1, ff2, delete, useSharedHeader, outstream);
460 		Shared.setBufferLen(oldBufferLen);
461 		Shared.setBuffers(oldBuffers);
462 	}
463 
getTempFile()464 	private String getTempFile(){
465 		String temp;
466 		File dir=new File(".");//(Shared.tmpdir()==null ? null : new File(Shared.tmpdir()));
467 		if(dir!=null && !dir.exists()){dir.mkdirs();}
468 		try {
469 			temp=File.createTempFile("sort_temp_", tempExt, dir).toString();
470 		} catch (IOException e) {
471 			// TODO Auto-generated catch block
472 			e.printStackTrace();
473 			KillSwitch.kill(e.getMessage());
474 			return null;
475 		}
476 		return temp;
477 	}
478 
mergeAndDump(ArrayList<String> fnames, boolean useHeader)479 	private boolean mergeAndDump(ArrayList<String> fnames, /*IntList dumpCount, */boolean useHeader) {
480 		if(fnames.size()*maxLengthObserved>2000000000 || fnames.size()>64){
481 			outstream.println("Performing recursive merge to reduce open files.");
482 			fnames=mergeRecursive(fnames);
483 		}
484 		return mergeAndDump(fnames, /*dumpCount,*/ ffout1, ffout2, delete, useHeader, outstream);
485 	}
486 
mergeAndDump(ArrayList<String> fnames, FileFormat ffout1, FileFormat ffout2, boolean delete, boolean useHeader, PrintStream outstream)487 	public boolean mergeAndDump(ArrayList<String> fnames, /*IntList dumpCount, */FileFormat ffout1, FileFormat ffout2, boolean delete, boolean useHeader, PrintStream outstream) {
488 
489 		final int oldBuffers=Shared.numBuffers();
490 		final int oldBufferLen=Shared.bufferLen();
491 
492 		if(fnames.size()>4){
493 			Shared.capBufferLen(8);
494 			Shared.setBuffers(1);
495 		}
496 
497 		System.err.println("Merging "+fnames);
498 //		outstream.println("zl="+ReadWrite.ZIPLEVEL);
499 //		outstream.println("ztd="+ReadWrite.ZIP_THREAD_DIVISOR);
500 //		outstream.println("mzt="+ReadWrite.MAX_ZIP_THREADS);
501 //		outstream.println("pigz="+ReadWrite.USE_PIGZ);
502 
503 		ListNum.setDeterministicRandom(false);
504 		boolean errorState=false;
505 		final ConcurrentReadOutputStream ros;
506 		if(ffout1!=null){
507 			final int buff=1;
508 			ros=ConcurrentReadOutputStream.getStream(ffout1, ffout2, null, null, buff, null, useHeader);
509 			ros.start(); //Start the stream
510 		}else{ros=null;}
511 
512 		ArrayList<CrisContainer> cclist=new ArrayList<CrisContainer>(fnames.size());
513 		for(int i=0; i<fnames.size(); i++){
514 			String fname=fnames.get(i);
515 //			int size=(dumpCount==null ? -1 : dumpCount.get(i));
516 			CrisContainer cc=new CrisContainer(fname, null, false);
517 			if(cc.peek()!=null){
518 				cclist.add(cc);
519 			}
520 		}
521 		ArrayList<CrisContainer> cclist2=(ArrayList<CrisContainer>) cclist.clone();
522 
523 		//TODO: Use the read counts, stored in dumpCount, to approximately restore random values for shuffling.
524 		mergeAndDump(cclist, ros, outstream);
525 		if(verbose){
526 			outstream.println("Finished processing "+fnames);
527 		}
528 
529 		for(CrisContainer cc : cclist2){
530 			errorState|=cc.close();
531 		}
532 		if(delete){
533 			for(String fname : fnames){
534 				new File(fname).delete();
535 			}
536 		}
537 		if(ros!=null){errorState|=ReadWrite.closeStream(ros);}
538 
539 		Shared.setBufferLen(oldBufferLen);
540 		Shared.setBuffers(oldBuffers);
541 
542 		return errorState;
543 	}
544 
mergeAndDump(final ArrayList<CrisContainer> q, final ConcurrentReadOutputStream ros, PrintStream outstream)545 	private static void mergeAndDump(final ArrayList<CrisContainer> q, final ConcurrentReadOutputStream ros, PrintStream outstream) {
546 
547 		for(CrisContainer cc : q){
548 			assert(!cc.cris().paired()) : FASTQ.TEST_INTERLEAVED+", "+FASTQ.FORCE_INTERLEAVED;
549 		}
550 
551 		final int limit=20000;
552 		ArrayList<Read> buffer=new ArrayList<Read>(2*limit);
553 		while(!q.isEmpty()){
554 			if(verbose2){outstream.println("q size: "+q.size());}
555 			for(int i=0; !q.isEmpty() && (buffer.size()<limit || i<1); i++){//For loop to force it to run at least once
556 				int num=randy.nextInt(q.size());
557 				CrisContainer cc=q.get(num);
558 				if(verbose2){outstream.println("Polled a cc.");}
559 				ArrayList<Read> list=cc.fetch();
560 				if(verbose2){outstream.println("Grabbed "+list.size()+" reads.");}
561 				if(list==null){
562 					q.remove(num);
563 				}else{
564 					buffer.addAll(list);
565 				}
566 				if(verbose2){outstream.println("Buffer size: "+buffer.size());}
567 			}
568 			Collections.shuffle(buffer);
569 			if(verbose2){outstream.println("Shuffled buffer.");}
570 
571 			ArrayList<Read> list=new ArrayList<Read>(buffer.size());
572 			list.addAll(buffer);
573 			if(ros!=null){ros.add(list, 0);}
574 
575 			buffer.clear();
576 		}
577 
578 		assert(buffer.isEmpty());
579 	}
580 
shuffleAndDump(final ArrayList<Read> storage, final long currentMem, final AtomicLong outstandingMem, String fname, boolean useHeader)581 	private void shuffleAndDump(final ArrayList<Read> storage, final long currentMem, final AtomicLong outstandingMem, String fname, boolean useHeader) {
582 		String temp=fname;
583 		if(temp==null){
584 			synchronized(outTemp){
585 				if(verbose2){outstream.println("Synced to outTemp to make a temp file.");}
586 				temp=getTempFile();
587 				if(verbose2){outstream.println("Temp file: "+temp);}
588 				outTemp.add(temp);
589 			}
590 		}
591 
592 		if(verbose || true){outstream.println("Created a WriteThread for "+temp);}
593 		WriteThread wt=new WriteThread(storage, currentMem, outstandingMem, temp, useHeader, outstream);
594 		wt.start();
595 	}
596 
597 	/*--------------------------------------------------------------*/
598 	/*----------------         Inner Classes        ----------------*/
599 	/*--------------------------------------------------------------*/
600 
601 	private static class WriteThread extends Thread{
602 
WriteThread(final ArrayList<Read> storage_, final long currentMem_, final AtomicLong outstandingMem_, String fname_, boolean useHeader_, PrintStream outstream_)603 		public WriteThread(final ArrayList<Read> storage_, final long currentMem_, final AtomicLong outstandingMem_, String fname_, boolean useHeader_, PrintStream outstream_){
604 			storage=storage_;
605 			currentMem=currentMem_;
606 			outstandingMem=outstandingMem_;
607 			fname=fname_;
608 			useHeader=useHeader_;
609 			outstream=outstream_;
610 		}
611 
612 		@Override
run()613 		public void run(){
614 
615 			if(verbose){outstream.println("Started a WriteThread.");}
616 			final FileFormat ffout=FileFormat.testOutput(fname, FileFormat.FASTQ, null, true, false, false, false);
617 			final ConcurrentReadOutputStream ros;
618 			if(ffout!=null){
619 				final int buff=4;
620 				ros=ConcurrentReadOutputStream.getStream(ffout, null, null, null, buff, null, useHeader);
621 				ros.start(); //Start the stream
622 			}else{ros=null;}
623 
624 			if(verbose){outstream.println("Started a ros.");}
625 			Collections.shuffle(storage);
626 
627 			if(verbose){outstream.println("Sorted reads.");}
628 
629 			ArrayList<Read> buffer=new ArrayList<Read>(200);
630 			long id=0;
631 			for(int i=0, lim=storage.size(); i<lim; i++){
632 				Read r=storage.set(i, null);
633 				buffer.add(r);
634 				if(buffer.size()>=200){
635 					if(ros!=null){ros.add(buffer, id);}
636 					id++;
637 					buffer=new ArrayList<Read>(200);
638 				}
639 			}
640 			if(ros!=null && buffer.size()>0){ros.add(buffer, id);}
641 			errorState|=ReadWrite.closeStream(ros);
642 			if(verbose){outstream.println("Closed ros.");}
643 
644 			synchronized(outstandingMem){
645 				outstandingMem.addAndGet(-currentMem);
646 				if(verbose){outstream.println("Decremented outstandingMem: "+outstandingMem);}
647 				outstandingMem.notify();
648 				if(verbose){outstream.println("Notified outstandingMem.");}
649 			}
650 		}
651 
652 		final ArrayList<Read> storage;
653 		final long currentMem;
654 		final AtomicLong outstandingMem;
655 		final String fname;
656 		final boolean useHeader;
657 		boolean errorState=false;
658 		final PrintStream outstream;
659 
660 	}
661 
662 	/*--------------------------------------------------------------*/
663 	/*----------------            Fields            ----------------*/
664 	/*--------------------------------------------------------------*/
665 
666 	/** Primary input file path */
667 	private String in1=null;
668 	/** Secondary input file path */
669 	private String in2=null;
670 
671 	private String qfin1=null;
672 	private String qfin2=null;
673 
674 	/** Primary output file path */
675 	private String out1=null;
676 	/** Secondary output file path */
677 	private String out2=null;
678 
679 	private ArrayList<String> outTemp=new ArrayList<String>();
680 
681 	/** Override input file extension */
682 	private String extin=null;
683 	/** Override output file extension */
684 	private String extout=null;
685 
686 	private String tempExt=null;
687 
688 	/*--------------------------------------------------------------*/
689 
690 	long maxLengthObserved=0;
691 
692 	/** Number of reads processed */
693 	protected long readsProcessed=0;
694 	/** Number of bases processed */
695 	protected long basesProcessed=0;
696 
697 	/** Quit after processing this many input reads; -1 means no limit */
698 	private long maxReads=-1;
699 
700 	private boolean delete=true;
701 
702 	private boolean useSharedHeader=false;
703 
704 	private boolean allowTempFiles=true;
705 
706 	private int minlen=0;
707 
708 	private float memMult=0.35f;
709 
710 	/** Max files to merge per pass */
711 	private int maxFiles=16;
712 
713 	private long seed=-1;
714 
715 	static Random randy=Shared.threadLocalRandom();
716 
717 	/*--------------------------------------------------------------*/
718 	/*----------------         Final Fields         ----------------*/
719 	/*--------------------------------------------------------------*/
720 
721 	/** Primary input file */
722 	private final FileFormat ffin1;
723 	/** Secondary input file */
724 	private final FileFormat ffin2;
725 
726 	/** Primary output file */
727 	private final FileFormat ffout1;
728 	/** Secondary output file */
729 	private final FileFormat ffout2;
730 
731 	/*--------------------------------------------------------------*/
732 	/*----------------        Common Fields         ----------------*/
733 	/*--------------------------------------------------------------*/
734 
735 	/** Print status messages to this output stream */
736 	private PrintStream outstream=System.err;
737 	/** Print verbose messages */
738 	public static boolean verbose=false;
739 	/** Print verbose messages */
740 	public static final boolean verbose2=false;
741 	/** True if an error was encountered */
742 	public boolean errorState=false;
743 	/** Overwrite existing output files */
744 	private boolean overwrite=false;
745 	/** Append to existing output files */
746 	private boolean append=false;
747 	/** This flag has no effect on singlethreaded programs */
748 	private final boolean ordered=false;
749 
750 }
751