1 package gff;
2 
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Arrays;
7 import java.util.HashMap;
8 import java.util.concurrent.atomic.AtomicInteger;
9 import java.util.concurrent.atomic.AtomicLong;
10 
11 import aligner.Alignment;
12 import fileIO.ByteFile;
13 import fileIO.ByteStreamWriter;
14 import fileIO.FileFormat;
15 import fileIO.ReadWrite;
16 import prok.PGMTools;
17 import prok.ProkObject;
18 import shared.Parse;
19 import shared.Parser;
20 import shared.PreParser;
21 import shared.ReadStats;
22 import shared.Shared;
23 import shared.Timer;
24 import shared.Tools;
25 import stream.ConcurrentReadOutputStream;
26 import stream.Read;
27 import stream.ReadInputStream;
28 import structures.ByteBuilder;
29 import tax.GiToTaxid;
30 import tax.TaxClient;
31 import tax.TaxTree;
32 import template.Accumulator;
33 import template.ThreadWaiter;
34 
35 public class CutGff implements Accumulator<CutGff.ProcessThread>  {
36 
37 	/*--------------------------------------------------------------*/
38 	/*----------------        Initialization        ----------------*/
39 	/*--------------------------------------------------------------*/
40 
41 	/**
42 	 * Code entrance from the command line.
43 	 * @param args Command line arguments
44 	 */
main(String[] args)45 	public static void main(String[] args){
46 		//Start a timer immediately upon code entrance.
47 		Timer t=new Timer();
48 
49 		//Create an instance of this class
50 		CutGff x=new CutGff(args);
51 
52 		//Run the object
53 		x.process(t);
54 
55 		//Close the print stream if it was redirected
56 		Shared.closeStream(x.outstream);
57 	}
58 
59 	/**
60 	 * Constructor.
61 	 * @param args Command line arguments
62 	 */
CutGff(String[] args)63 	public CutGff(String[] args){
64 		{//Preparse block for help, config files, and outstream
65 			PreParser pp=new PreParser(args, null/*getClass()*/, false);
66 			args=pp.args;
67 			outstream=pp.outstream;
68 		}
69 
70 		//Set shared static variables prior to parsing
71 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
72 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
73 
74 		Shared.TRIM_READ_COMMENTS=Shared.TRIM_RNAME=true;
75 		Read.TO_UPPER_CASE=true;
76 		Read.VALIDATE_IN_CONSTRUCTOR=true;
77 		GffLine.parseAttributes=true;
78 
79 		{//Parse the arguments
80 			final Parser parser=parse(args);
81 			overwrite=parser.overwrite;
82 			append=parser.append;
83 
84 			out=parser.out1;
85 		}
86 
87 		if(alignRibo){
88 			//Load sequences
89 			ProkObject.loadConsensusSequenceFromFile(false, false);
90 		}
91 
92 		fixExtensions(); //Add or remove .gz or .bz2 as needed
93 		checkFileExistence(); //Ensure files can be read and written
94 		checkStatics(); //Adjust file-related static fields as needed for this program
95 
96 		ffout=FileFormat.testOutput(out, FileFormat.PGM, null, true, overwrite, append, false);
97 	}
98 
99 	/*--------------------------------------------------------------*/
100 	/*----------------    Initialization Helpers    ----------------*/
101 	/*--------------------------------------------------------------*/
102 
103 	/** Parse arguments from the command line */
parse(String[] args)104 	private Parser parse(String[] args){
105 
106 		Parser parser=new Parser();
107 		parser.overwrite=overwrite;
108 		for(int i=0; i<args.length; i++){
109 			String arg=args[i];
110 			String[] split=arg.split("=");
111 			String a=split[0].toLowerCase();
112 			String b=split.length>1 ? split[1] : null;
113 			if(b!=null && b.equalsIgnoreCase("null")){b=null;}
114 
115 //			outstream.println(arg+", "+a+", "+b);
116 			if(PGMTools.parseStatic(arg, a, b)){
117 				//do nothing
118 			}else if(a.equals("in") || a.equals("infna") || a.equals("fnain") || a.equals("fna") || a.equals("ref")){
119 				assert(b!=null);
120 				Tools.addFiles(b, fnaList);
121 			}else if(a.equals("gff") || a.equals("ingff") || a.equals("gffin")){
122 				assert(b!=null);
123 				Tools.addFiles(b, gffList);
124 			}else if(a.equals("verbose")){
125 				verbose=Parse.parseBoolean(b);
126 				ReadWrite.verbose=verbose;
127 			}else if(a.equals("alignribo") || a.equals("align")){
128 				alignRibo=Parse.parseBoolean(b);
129 			}else if(a.equals("adjustendpoints")){
130 				adjustEndpoints=Parse.parseBoolean(b);
131 			}else if(a.equalsIgnoreCase("slop16s") || a.equalsIgnoreCase("16sslop") || a.equalsIgnoreCase("ssuslop")){
132 				ssuSlop=Integer.parseInt(b);
133 			}else if(a.equalsIgnoreCase("slop23s") || a.equalsIgnoreCase("23sslop") || a.equalsIgnoreCase("lsuslop")){
134 				lsuSlop=Integer.parseInt(b);
135 			}else if(a.equalsIgnoreCase("maxns") || a.equalsIgnoreCase("maxundefined")){
136 				maxNs=Integer.parseInt(b);
137 			}else if(a.equalsIgnoreCase("maxnrate") || a.equalsIgnoreCase("maxnfraction")){
138 				maxNFraction=Integer.parseInt(b);
139 			}else if(a.equals("invert")){
140 				invert=Parse.parseBoolean(b);
141 			}else if(a.equals("type") || a.equals("types")){
142 				types=b;
143 			}else if(a.equals("attributes") || a.equals("requiredattributes")){
144 				requiredAttributes=b.split(",");
145 			}else if(a.equals("banattributes") || a.equals("bannedattributes")){
146 				bannedAttributes=b.split(",");
147 			}else if(a.equals("banpartial")){
148 				banPartial=Parse.parseBoolean(b);
149 			}
150 
151 			else if(a.equalsIgnoreCase("renameByTaxID")){
152 				renameByTaxID=Parse.parseBoolean(b);
153 			}else if(a.equals("taxmode")){
154 				if("accession".equalsIgnoreCase(b)){
155 					taxMode=ACCESSION_MODE;
156 				}else if("header".equalsIgnoreCase(b)){
157 					taxMode=HEADER_MODE;
158 				}else if("gi".equalsIgnoreCase(b)){
159 					taxMode=GI_MODE;
160 				}else if("taxid".equalsIgnoreCase(b)){
161 					taxMode=TAXID_MODE;
162 				}else{
163 					assert(false) : "Bad tax mode: "+b;
164 				}
165 			}else if(a.equals("requirepresent")){
166 				requirePresent=Parse.parseBoolean(b);
167 			}else if(a.equalsIgnoreCase("onePerFile")){
168 				onePerFile=Parse.parseBoolean(b);
169 			}else if(a.equalsIgnoreCase("pickBest") || a.equalsIgnoreCase("findBest") || a.equalsIgnoreCase("keepBest")){
170 				pickBest=Parse.parseBoolean(b);
171 			}
172 
173 			else if(a.equals("minlen")){
174 				minLen=Integer.parseInt(b);
175 			}else if(a.equals("maxlen")){
176 				maxLen=Integer.parseInt(b);
177 			}
178 
179 			else if(ProkObject.parse(arg, a, b)){
180 				//do nothing
181 			}else if(parser.parse(arg, a, b)){
182 				//do nothing
183 			}else if(arg.indexOf('=')<0 && new File(arg).exists() && FileFormat.isFastaFile(arg)){
184 				fnaList.add(arg);
185 			}else{
186 				outstream.println("Unknown parameter "+args[i]);
187 				assert(false) : "Unknown parameter "+args[i];
188 				//				throw new RuntimeException("Unknown parameter "+args[i]);
189 			}
190 		}
191 
192 		ArrayList<String> banned=new ArrayList<String>();
193 		if(banPartial){banned.add("partial=true");}
194 		if(bannedAttributes!=null){
195 			for(String s : bannedAttributes){banned.add(s);}
196 		}
197 		bannedAttributes=banned.isEmpty() ? null : banned.toArray(new String[0]);
198 
199 		if(gffList.isEmpty()){
200 			for(String s : fnaList){
201 				String prefix=ReadWrite.stripExtension(s);
202 				String gff=prefix+".gff";
203 				File f=new File(gff);
204 				if(!f.exists()){
205 					String gz=gff+".gz";
206 					f=new File(gz);
207 					assert(f.exists() && f.canRead()) : "Can't read file "+gff;
208 					gff=gz;
209 				}
210 				gffList.add(gff);
211 			}
212 		}
213 		assert(gffList.size()==fnaList.size()) : "Number of fna and gff files do not match: "+fnaList.size()+", "+gffList.size();
214 		return parser;
215 	}
216 
217 	/** Add or remove .gz or .bz2 as needed */
fixExtensions()218 	private void fixExtensions(){
219 		fnaList=Tools.fixExtension(fnaList);
220 		gffList=Tools.fixExtension(gffList);
221 		if(fnaList.isEmpty()){throw new RuntimeException("Error - at least one input file is required.");}
222 	}
223 
224 	/** Ensure files can be read and written */
checkFileExistence()225 	private void checkFileExistence(){
226 		//Ensure output files can be written
227 		if(!Tools.testOutputFiles(overwrite, append, false, out)){
228 			outstream.println((out==null)+", "+out);
229 			throw new RuntimeException("\n\noverwrite="+overwrite+"; Can't write to output file "+out+"\n");
230 		}
231 
232 		//Ensure input files can be read
233 		ArrayList<String> foo=new ArrayList<String>();
234 		foo.addAll(fnaList);
235 		foo.addAll(gffList);
236 		if(!Tools.testInputFiles(false, true, foo.toArray(new String[0]))){
237 			throw new RuntimeException("\nCan't read some input files.\n");
238 		}
239 
240 		//Ensure that no file was specified multiple times
241 		foo.add(out);
242 		if(!Tools.testForDuplicateFiles(true, foo.toArray(new String[0]))){
243 			throw new RuntimeException("\nSome file names were specified multiple times.\n");
244 		}
245 	}
246 
247 	/** Adjust file-related static fields as needed for this program */
checkStatics()248 	private static void checkStatics(){
249 		//Adjust the number of threads for input file reading
250 		if(!ByteFile.FORCE_MODE_BF1 && !ByteFile.FORCE_MODE_BF2 && Shared.threads()>2){
251 			ByteFile.FORCE_MODE_BF2=true;
252 		}
253 	}
254 
255 	/*--------------------------------------------------------------*/
256 	/*----------------         Actual Code          ----------------*/
257 	/*--------------------------------------------------------------*/
258 
259 
260 
261 	/*--------------------------------------------------------------*/
262 
process(Timer t)263 	public void process(Timer t){
264 		if(Shared.threads()>2 && fnaList.size()>1){
265 			processMT(t);
266 		}else{
267 			processST(t);
268 		}
269 	}
270 
processST(Timer t)271 	public void processST(Timer t){
272 		ByteStreamWriter bsw=(ffout==null ? null : new ByteStreamWriter(ffout));
273 		if(bsw!=null){bsw.start();}
274 
275 		for(int i=0; i<fnaList.size(); i++){
276 			processFileST(fnaList.get(i), gffList.get(i), types, bsw);
277 		}
278 
279 		if(bsw!=null){bsw.poisonAndWait();}
280 		t.stop();
281 		if(ffout!=null){outstream.println("Wrote "+out);}
282 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
283 		outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
284 		if(alignRibo){
285 			outstream.println(Tools.number("Flipped:           ", flipped.get(), 8));
286 			outstream.println(Tools.number("Failed Alignment:  ", failed.get(), 8));
287 		}
288 	}
289 
processMT(Timer t)290 	public void processMT(Timer t){
291 
292 		//Optionally create a read output stream
293 		final ConcurrentReadOutputStream ros=makeCros();
294 
295 		//Reset counters
296 		readsProcessed=readsOut=0;
297 		basesProcessed=basesOut=0;
298 
299 		//Process the reads in separate threads
300 		spawnThreads(ros);
301 
302 		if(verbose){outstream.println("Finished; closing streams.");}
303 
304 		//Write anything that was accumulated by ReadStats
305 		errorState|=ReadStats.writeAll();
306 		//Close the read streams
307 		errorState|=ReadWrite.closeStream(ros);
308 
309 		//Report timing and results
310 		t.stop();
311 		if(ffout!=null){outstream.println("Wrote "+out);}
312 		outstream.println(Tools.timeReadsBasesProcessed(t, readsProcessed, basesProcessed, 8));
313 		outstream.println(Tools.readsBasesOut(readsProcessed, basesProcessed, readsOut, basesOut, 8, false));
314 		if(alignRibo){
315 			outstream.println(Tools.number("Flipped:           ", flipped.get(), 8));
316 			outstream.println(Tools.number("Failed Alignment:  ", failed.get(), 8));
317 		}
318 
319 		//Throw an exception of there was an error in a thread
320 		if(errorState){
321 			throw new RuntimeException(getClass().getName()+" terminated in an error state; the output may be corrupt.");
322 		}
323 	}
324 
325 	/*--------------------------------------------------------------*/
326 
hasAttributes(GffLine gline)327 	private boolean hasAttributes(GffLine gline){
328 		int len=gline.length();
329 		if(len<minLen || len>maxLen){return false;}
330 		if(hasAttributes(gline, bannedAttributes)){return false;}
331 		return requiredAttributes==null || hasAttributes(gline, requiredAttributes);
332 	}
333 
hasAttributes(GffLine gline, String[] attributes)334 	private static boolean hasAttributes(GffLine gline, String[] attributes){
335 		if(attributes==null){return false;}
336 		for(String s : attributes){
337 			if(gline.attributes.contains(s)){
338 				return true;
339 			}
340 		}
341 		return false;
342 	}
343 
processFileST(String fna, String gff, String types, ByteStreamWriter bsw)344 	private void processFileST(String fna, String gff, String types, ByteStreamWriter bsw){
345 		ArrayList<Read> reads=processFile(fna, gff, types);
346 		if(reads!=null){
347 			for(Read r : reads){
348 				if(bsw!=null){bsw.println(r);}
349 			}
350 		}
351 	}
352 
processFile(String fna, String gff, String types)353 	private ArrayList<Read> processFile(String fna, String gff, String types){
354 		ArrayList<GffLine> lines=GffLine.loadGffFile(gff, types, false);
355 
356 		ArrayList<Read> list=ReadInputStream.toReads(fna, FileFormat.FA, -1);
357 		HashMap<String, Read> map=new HashMap<String, Read>();
358 
359 		for(Read r : list){
360 			readsProcessed++;
361 			basesProcessed+=r.length();
362 			map.put(r.id, r);
363 		}
364 
365 		if(renameByTaxID){//Note this must be AFTER adding to the hashmap.
366 			renameByTaxID(list);
367 		}
368 
369 		ArrayList<Read> outList=processLines(lines, map, invert);
370 
371 		if(invert){
372 			for(Read r : list){
373 				readsOut++;
374 				basesOut+=r.length();
375 			}
376 			return list;
377 		}else{
378 			if(outList!=null){
379 				for(Read r : outList){
380 					readsOut++;
381 					basesOut+=r.length();
382 				}
383 			}
384 			return outList;
385 		}
386 	}
387 
renameByTaxID(ArrayList<Read> list)388 	private void renameByTaxID(ArrayList<Read> list){
389 		ByteBuilder bb=new ByteBuilder();
390 		for(Read r : list){
391 			if(bb.length>0){bb.comma();}
392 			bb.append(taxMode==HEADER_MODE ? r.id : taxMode==ACCESSION_MODE ? parseAccession(r.id) : parseGi(r.id));
393 		}
394 		final int[] ids;
395 		if(taxMode==ACCESSION_MODE){
396 			ids=TaxClient.accessionToTaxidArray(bb.toString());
397 		}else if(taxMode==GI_MODE){
398 			ids=TaxClient.giToTaxidArray(bb.toString());
399 		}else if(taxMode==HEADER_MODE){
400 			ids=TaxClient.headerToTaxidArray(bb.toString());
401 		}else if(taxMode==TAXID_MODE){
402 			ids=parseTaxIds(list);
403 		}else{
404 			throw new RuntimeException("Bad mode: "+TAXID_MODE);
405 		}
406 		assert(ids.length==list.size()) : ids.length+", "+list.size();
407 		for(int i=0; i<ids.length; i++){
408 			Read r=list.get(i);
409 			int id=ids[i];
410 
411 			if(r.id!=null && r.id.startsWith("tid|")){
412 				id=TaxTree.parseHeaderStatic(r.id);
413 				r.obj=id;
414 			}else {
415 				assert(id>=0 || !requirePresent) : "Can't find taxID for header: "+id+", "+r.name();
416 
417 				r.obj=id;
418 				r.id="tid|"+id+"|"+id;
419 			}
420 		}
421 	}
422 
parseTaxIds(ArrayList<Read> list)423 	private int[] parseTaxIds(ArrayList<Read> list){
424 		int[] ids=new int[list.size()];
425 		for(int i=0; i<list.size(); i++){
426 			Read r=list.get(i);
427 			int x=GiToTaxid.parseTaxidNumber(r.id, '|');
428 			ids[i]=x;
429 		}
430 		return ids;
431 	}
432 
parseAccession(String id)433 	private String parseAccession(String id){
434 		int dot=id.indexOf('.');
435 		int space=id.indexOf(' ');
436 //		assert(dot>0 && space>0) : "Unexpected header format: "+id+"\nTry header mode instead of accession mode.";
437 		int limit=Tools.min((dot<0 ? id.length() : dot), (space<0 ? id.length() : space));
438 		return id.substring(0, limit);
439 	}
440 
parseGi(String id)441 	private String parseGi(String id){
442 		assert(id.startsWith("gi|"));
443 		String[] split=id.split("|");
444 		return split[1];
445 	}
446 
processLines(ArrayList<GffLine> lines, HashMap<String, Read> map, boolean invertSelection)447 	private ArrayList<Read> processLines(ArrayList<GffLine> lines, HashMap<String, Read> map, boolean invertSelection){
448 		ArrayList<Read> list=null;
449 		for(GffLine gline : lines){
450 			if(hasAttributes(gline)){
451 				Read scaf=map.get(gline.seqid);
452 				assert(scaf!=null) : "Can't find "+gline.seqid+" in "+map.keySet();
453 
454 				boolean pass=true;
455 				Float identity=null;
456 				if(alignRibo && gline.inbounds(scaf.length())){
457 					int type=gline.prokType();
458 					identity=align(gline, scaf.bases, type);
459 					if(identity==null){pass=false;}
460 				}
461 
462 				if(pass){
463 					final int start=gline.start-1;
464 					final int stop=gline.stop-1;
465 
466 					if(invertSelection){
467 						byte[] bases=scaf.bases;
468 						for(int i=start; i<=stop; i++){
469 							if(i>=0 && i<bases.length){
470 								bases[i]='N';
471 							}
472 						}
473 					}else{
474 						if(start>=0 && stop<scaf.length()){
475 							String id=gline.attributes;
476 							if(renameByTaxID){
477 								id="tid|"+scaf.obj+"|"+id;
478 							}
479 							Read r=new Read(Arrays.copyOfRange(scaf.bases, start, stop+1), null, id, 1);
480 							r.obj=identity;
481 
482 							assert(!r.containsLowercase()) : r.toFasta()+"\n"
483 							+ "validated="+r.validated()+", scaf.validated="+scaf.validated()+", tuc="+Read.TO_UPPER_CASE+", vic="+Read.VALIDATE_IN_CONSTRUCTOR;
484 							if(maxNs>=0 || maxNFraction>=0){
485 								long allowed=Tools.min(maxNs>=0 ? maxNs : r.length(), (long)(r.length()*(maxNFraction>=0 ? maxNFraction : 1)));
486 								if(r.countUndefined()>allowed){r=null;}
487 							}
488 
489 							if(r!=null){
490 								if(gline.strand==1){r.reverseComplement();}
491 								if(list==null){list=new ArrayList<Read>(8);}
492 								list.add(r);
493 								if(onePerFile && !pickBest){break;}
494 							}
495 						}
496 					}
497 				}
498 			}
499 		}
500 		if(pickBest && list!=null && list.size()>1){
501 			Read best=null;
502 			float bestID=0;
503 			for(Read r : list){
504 				float identity=(r.obj==null ? 0.001f : (Float)r.obj);
505 				if(identity>bestID){
506 					bestID=identity;
507 					best=r;
508 				}
509 			}
510 			assert(best!=null);
511 			list.clear();
512 			list.add(best);
513 		}
514 		return list;
515 	}
516 
align(GffLine gline, byte[] scaf, int type)517 	private Float align(GffLine gline, byte[] scaf, int type){
518 		Read[] consensusReads=ProkObject.consensusReads(type);
519 		if(consensusReads==null || consensusReads.length==0){
520 			assert(false) : type+"\n"+gline.toString();
521 			return null;
522 		}
523 		byte[] universal=consensusReads[0].bases;
524 		float minIdentity=ProkObject.minID(type)*ID_MULT;
525 		if(universal==null){assert(false); return 1F;}
526 
527 		int start=gline.start-1;
528 		int stop=gline.stop-1;
529 		assert(start<=stop) : start+", "+stop+", "+scaf.length;
530 		assert(start>=0 && start<scaf.length) : start+", "+stop+", "+scaf.length;
531 		assert(stop>=0 && stop<scaf.length) : start+", "+stop+", "+scaf.length;
532 //		final int a=Tools.max(0, start-(adjustEndpoints ? 100 : 20));
533 //		final int b=Tools.min(scaf.length-1, stop+(adjustEndpoints ? 100 : 20));
534 		final int a=Tools.max(0, start);
535 		final int b=Tools.min(scaf.length-1, stop);
536 
537 		byte[] ref=Arrays.copyOfRange(scaf, a, b+1);
538 		Read r=new Read(ref, null, 0);
539 		if(gline.strand==GffLine.MINUS){r.reverseComplement();}
540 
541 		Alignment plus=new Alignment(r);
542 		plus.align(universal);
543 //		if(plus.id>=minIdentity){return true;}
544 
545 		r.reverseComplement();
546 		Alignment minus=new Alignment(r);
547 		minus.align(universal);
548 
549 		Alignment best=null;
550 		if(plus.id>=minus.id){
551 			best=plus;
552 //			System.err.println("Kept:    "+plus.id+" \t"+minus.id);
553 		}else{
554 			best=minus;
555 			if(minus.id>=minIdentity){
556 				if(verbose) {System.err.println("Flipped: "+plus.id+" \t"+minus.id+"");}
557 				flipped.incrementAndGet();
558 				gline.strand=Shared.MINUS;
559 			}
560 		}
561 		if(best.id>=minIdentity){
562 			return best.id;
563 		}else{
564 			if(verbose) {System.err.println("Failed alignment: "+plus.id+" \t"+minus.id);}
565 			failed.incrementAndGet();
566 			return null;
567 		}
568 //
569 //		int[] coords=KillSwitch.allocInt1D(2);
570 //		float id1=align(universal, scaf, a, b, minIdentity, coords);
571 //		final int rstart=coords[0], rstop=coords[1];
572 //		//				assert(false) : rstart+", "+rstop+", "+(rstop-rstart+1)+", "+start+", "+stop;
573 //		if(id1<minIdentity){
574 //			System.err.println("Low identity: "+String.format("%.2s", 100*id1));
575 //			return false;
576 //		}
577 //		if(adjustEndpoints){
578 //			int slop=(flag==4 ? AnalyzeGenes.lsuSlop : AnalyzeGenes.ssuSlop);
579 //			if(Tools.absdif(start, rstart)>slop){
580 //				//						System.err.println("rstart:\t"+start+" -> "+rstart);
581 //				start=rstart;
582 //			}
583 //			if(Tools.absdif(stop, rstop)>slop){
584 //				//						System.err.println("rstop: \t"+stop+" -> "+rstop);
585 //				stop=rstop;
586 //			}
587 //		}
588 	}
589 
590 	/*--------------------------------------------------------------*/
591 	/*----------------       Thread Management      ----------------*/
592 	/*--------------------------------------------------------------*/
593 
makeCros()594 	private ConcurrentReadOutputStream makeCros(){
595 		if(ffout==null){return null;}
596 
597 		//Select output buffer size based on whether it needs to be ordered
598 		final int buff=(ordered ? Tools.mid(16, 128, (Shared.threads()*2)/3) : 8);
599 
600 		final ConcurrentReadOutputStream ros=ConcurrentReadOutputStream.getStream(ffout, null, buff, null, false);
601 		ros.start(); //Start the stream
602 		return ros;
603 	}
604 
605 	/** Spawn process threads */
spawnThreads(ConcurrentReadOutputStream ros)606 	private void spawnThreads(ConcurrentReadOutputStream ros){
607 		//Do anything necessary prior to processing
608 
609 		//Determine how many threads may be used
610 		final int threads=Tools.min(Shared.threads(), fnaList.size());
611 
612 		//Controls access to input files
613 		AtomicInteger atom=new AtomicInteger(0);
614 
615 		//Fill a list with ProcessThreads
616 		ArrayList<ProcessThread> alpt=new ArrayList<ProcessThread>(threads);
617 		for(int i=0; i<threads; i++){
618 			alpt.add(new ProcessThread(atom, ros, i));
619 		}
620 
621 		//Start the threads and wait for them to finish
622 		boolean success=ThreadWaiter.startAndWait(alpt, this);
623 		errorState&=!success;
624 
625 		//Do anything necessary after processing
626 
627 	}
628 
629 	@Override
accumulate(ProcessThread pt)630 	public final void accumulate(ProcessThread pt){
631 		readsProcessed+=pt.readsProcessedT;
632 		basesProcessed+=pt.basesProcessedT;
633 		readsOut+=pt.readsOutT;
634 		basesOut+=pt.basesOutT;
635 		errorState|=(!pt.success);
636 	}
637 
638 	@Override
success()639 	public final boolean success(){return !errorState;}
640 
641 	/*--------------------------------------------------------------*/
642 	/*----------------         Inner Classes        ----------------*/
643 	/*--------------------------------------------------------------*/
644 
645 	/** This class is static to prevent accidental writing to shared variables.
646 	 * It is safe to remove the static modifier. */
647 	class ProcessThread extends Thread {
648 
649 		//Constructor
ProcessThread(final AtomicInteger atom_, ConcurrentReadOutputStream ros_, final int tid_)650 		ProcessThread(final AtomicInteger atom_, ConcurrentReadOutputStream ros_, final int tid_){
651 			atom=atom_;
652 			ros=ros_;
653 			tid=tid_;
654 		}
655 
656 		//Called by start()
657 		@Override
run()658 		public void run(){
659 			//Do anything necessary prior to processing
660 
661 			for(int fnum=atom.getAndIncrement(), lim=fnaList.size(); fnum<lim; fnum=atom.getAndIncrement()) {
662 				String fna=fnaList.get(fnum);
663 				String gff=gffList.get(fnum);
664 				//Process the reads
665 				ArrayList<Read> list=processFileT(fna, gff, types);
666 				if(ros!=null){
667 					if(list==null){list=dummy;}
668 					ros.add(list, fnum);
669 				}
670 			}
671 
672 			//Do anything necessary after processing
673 
674 			//Indicate successful exit status
675 			success=true;
676 		}
677 
678 		//Duplicated
processFileT(String fna, String gff, String types)679 		private ArrayList<Read> processFileT(String fna, String gff, String types){
680 			ArrayList<GffLine> lines=GffLine.loadGffFile(gff, types, false);
681 
682 			ArrayList<Read> list=ReadInputStream.toReads(fna, FileFormat.FA, -1);
683 			HashMap<String, Read> map=new HashMap<String, Read>();
684 
685 			for(Read r : list){
686 				readsProcessedT++;
687 				basesProcessedT+=r.length();
688 				map.put(r.id, r);
689 			}
690 
691 			if(renameByTaxID){//Note this must be AFTER adding to the hashmap.
692 				renameByTaxID(list);
693 			}
694 //			assert(false) : renameByTaxID+", "+list.size();
695 
696 			ArrayList<Read> outList=processLines(lines, map, invert);
697 
698 			if(invert){
699 				for(Read r : list){
700 					readsOutT++;
701 					basesOutT+=r.length();
702 				}
703 				return list;
704 			}else{
705 				if(outList!=null){
706 					for(Read r : outList){
707 						readsOutT++;
708 						basesOutT+=r.length();
709 					}
710 				}
711 				return outList;
712 			}
713 		}
714 
715 		/** Number of reads processed by this thread */
716 		protected long readsProcessedT=0;
717 		/** Number of bases processed by this thread */
718 		protected long basesProcessedT=0;
719 
720 		/** Number of reads retained by this thread */
721 		protected long readsOutT=0;
722 		/** Number of bases retained by this thread */
723 		protected long basesOutT=0;
724 
725 		/** True only if this thread has completed successfully */
726 		boolean success=false;
727 
728 		/** Shared output stream */
729 		private final ConcurrentReadOutputStream ros;
730 		/** Thread ID */
731 		final int tid;
732 		/** Next file ID */
733 		final AtomicInteger atom;
734 	}
735 
736 	/*--------------------------------------------------------------*/
737 	/*----------------            Fields            ----------------*/
738 	/*--------------------------------------------------------------*/
739 
740 	private ArrayList<String> fnaList=new ArrayList<String>();
741 	private ArrayList<String> gffList=new ArrayList<String>();
742 	private String out=null;
743 	private String types="CDS";
744 	private boolean invert=false;
745 	private boolean banPartial=true;
746 	private int minLen=1;
747 	private int maxLen=Integer.MAX_VALUE;
748 
749 	private String[] requiredAttributes;
750 	private String[] bannedAttributes;
751 
752 	/*--------------------------------------------------------------*/
753 
754 	private long bytesOut=0;
755 	private boolean renameByTaxID=false;
756 	private int taxMode=ACCESSION_MODE;
757 	private boolean requirePresent=false;
758 	private boolean alignRibo=false;
759 	private boolean adjustEndpoints=false;
760 	private boolean onePerFile=false;
761 	private boolean pickBest=false;
762 	private int ssuSlop=999;
763 	private int lsuSlop=999;
764 
765 	private float ID_MULT=0.96f;
766 
767 	private int maxNs=-1;
768 	private double maxNFraction=-1;
769 
770 	private static int ACCESSION_MODE=0, GI_MODE=1, HEADER_MODE=2, TAXID_MODE=3;
771 
772 	/*--------------------------------------------------------------*/
773 
774 	/** Number of reads processed */
775 	protected long readsProcessed=0;
776 	/** Number of bases processed */
777 	protected long basesProcessed=0;
778 
779 	/** Number of reads retained */
780 	protected long readsOut=0;
781 	/** Number of bases retained */
782 	protected long basesOut=0;
783 
784 	protected AtomicLong flipped=new AtomicLong(0);
785 	protected AtomicLong failed=new AtomicLong(0);
786 
787 	/** Quit after processing this many input reads; -1 means no limit */
788 	private long maxReads=-1;
789 
790 	/*--------------------------------------------------------------*/
791 	/*----------------         Final Fields         ----------------*/
792 	/*--------------------------------------------------------------*/
793 
794 	private final FileFormat ffout;
795 	private final ArrayList<Read> dummy=new ArrayList<Read>();
796 
797 	/*--------------------------------------------------------------*/
798 	/*----------------        Common Fields         ----------------*/
799 	/*--------------------------------------------------------------*/
800 
801 	private PrintStream outstream=System.err;
802 	public static boolean verbose=false;
803 	public boolean errorState=false;
804 	public boolean ordered=true;
805 	private boolean overwrite=true;
806 	private boolean append=false;
807 
808 }
809