1 package stream;
2 
3 import java.util.ArrayList;
4 import java.util.Arrays;
5 
6 import fileIO.FileFormat;
7 import fileIO.ReadWrite;
8 import shared.Shared;
9 import structures.ListNum;
10 
11 /**
12  * Abstract superclass of all ConcurrentReadStreamInterface implementations.
13  * ConcurrentReadInputStreams allow paired reads from twin files to be treated as a single stream.
14  * @author Brian Bushnell
15  * @date Nov 26, 2014
16  *
17  */
18 public abstract class ConcurrentReadInputStream implements ConcurrentReadStreamInterface {
19 
20 	/*--------------------------------------------------------------*/
21 	/*----------------        Initialization        ----------------*/
22 	/*--------------------------------------------------------------*/
23 
ConcurrentReadInputStream()24 	protected ConcurrentReadInputStream(){}
25 
26 	/**
27 	 * Special method for testing, handles some parsing.
28 	 * Used by MultiCros.
29 	 * Not really necessary; safe to remove.
30 	 */
getReadInputStream(long maxReads, boolean keepSamHeader, boolean allowSubprocess, String...args)31 	protected static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, boolean allowSubprocess, String...args){
32 		assert(args.length>0) : Arrays.toString(args);
33 		for(int i=0; i<args.length; i++){
34 			if("null".equalsIgnoreCase(args[i])){args[i]=null;}
35 		}
36 		assert(args[0]!=null) : Arrays.toString(args);
37 
38 		assert(args.length<2 || !args[0].equalsIgnoreCase(args[1]));
39 		String in1=args[0], in2=null, qf1=null, qf2=null;
40 		if(args.length>1){in2=args[1];}
41 		if(args.length>2){qf1=args[2];}
42 		if(args.length>3){qf2=args[3];}
43 
44 		final FileFormat ff1=FileFormat.testInput(in1, null, allowSubprocess);
45 		final FileFormat ff2=FileFormat.testInput(in2, null, allowSubprocess);
46 
47 		return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2);
48 	}
49 
50 	/** @See primary method */
getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2)51 	public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2){
52 		return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, (String)null, (String)null, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
53 	}
54 
55 	/** @See primary method */
getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, final boolean mpi, final boolean keepAll)56 	public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2,
57 			final boolean mpi, final boolean keepAll){
58 		return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, (String)null, (String)null, mpi, keepAll);
59 	}
60 
61 //	/** @See primary method */
62 //	public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, String qf1){
63 //		return getReadInputStream(maxReads, keepSamHeader, ff1, null, qf1, null, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
64 //	}
65 
66 	/** @See primary method */
getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, String qf1, String qf2)67 	public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader,
68 			FileFormat ff1, FileFormat ff2, String qf1, String qf2){
69 		return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
70 	}
71 
72 	/**
73 	 * @param maxReads Quit producing after this many reads (or pairs)
74 	 * @param keepSamHeader If the input is sam, store the header in the static shared header object
75 	 * @param ff1 Read 1 file (required)
76 	 * @param ff2 Read 2 file (optional)
77 	 * @param qf1 Qual file 1 (optional)
78 	 * @param qf2 Qual file 2 (optional)
79 	 * @param mpi True if MPI will be used
80 	 * @param keepAll In MPI mode, tells this stream to keep all reads instead of just a fraction
81 	 * @return
82 	 */
getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, String qf1, String qf2, final boolean mpi, final boolean keepAll)83 	public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader,
84 			FileFormat ff1, FileFormat ff2, String qf1, String qf2, final boolean mpi, final boolean keepAll){
85 		if(mpi){
86 			final int rank=Shared.MPI_RANK;
87 			final ConcurrentReadInputStream cris0;
88 			if(rank==0){
89 				cris0=getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2, false, true);
90 				cris0.start();
91 			}else{
92 				cris0=null;
93 			}
94 			final ConcurrentReadInputStream crisD;
95 			if(Shared.USE_CRISMPI){
96 				assert(false) : "To support MPI, uncomment this.";
97 //				crisD=new ConcurrentReadInputStreamMPI(cris0, rank==0, keepAll);
98 				crisD=null;
99 			}else{
100 				crisD=new ConcurrentReadInputStreamD(cris0, rank==0, keepAll);
101 			}
102 			return crisD;
103 		}
104 
105 		assert(ff1!=null);
106 		assert(ff2==null || ff1.name()==null || !ff1.name().equalsIgnoreCase(ff2.name())) : ff1.name()+", "+ff2.name();
107 		assert(qf1==null || ff1.name()==null || !ff1.name().equalsIgnoreCase(qf2));
108 		assert(qf1==null || qf2==null || qf1.equalsIgnoreCase(qf2));
109 
110 		final ConcurrentReadInputStream cris;
111 
112 		if(ff1.fastq()){
113 
114 			ReadInputStream ris1, ris2;
115 
116 			ris1=new FastqReadInputStream(ff1);
117 			try {
118 				ris2=(ff2==null ? null : new FastqReadInputStream(ff2));
119 			} catch (AssertionError e) {//Handles problems with quality score autodetection
120 				ris1.close();
121 				throw e;
122 			}
123 			cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads);
124 
125 		}else if(ff1.oneline()){
126 
127 			ReadInputStream ris1=new OnelineReadInputStream(ff1);
128 			ReadInputStream ris2=(ff2==null ? null : new OnelineReadInputStream(ff2));
129 			cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads);
130 
131 		}else if(ff1.fasta()){
132 
133 			ReadInputStream ris1;
134 			ReadInputStream ris2;
135 			if(ff1.preferShreds()){
136 				ris1=new FastaShredInputStream(ff1, Shared.AMINO_IN, ff2==null ? Shared.bufferData() : -1);
137 				ris2=(ff2==null ? null : new FastaShredInputStream(ff2, Shared.AMINO_IN, -1));
138 			}else{
139 				ris1=(qf1==null ? new FastaReadInputStream(ff1, (FASTQ.FORCE_INTERLEAVED && ff2==null), Shared.AMINO_IN, ff2==null ? Shared.bufferData() : -1)
140 						: new FastaQualReadInputStream(ff1, qf1));
141 				ris2=(ff2==null ? null : qf2==null ? new FastaReadInputStream(ff2, false, Shared.AMINO_IN, -1) : new FastaQualReadInputStream(ff2, qf2));
142 			}
143 			cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads);
144 
145 //			cris.start();
146 //			ListNum<Read> ln=cris.nextList();
147 //			System.out.println(ln);
148 //
149 //			assert(false) : ff1+", "+ff2;
150 		}else if(ff1.scarf()){
151 
152 			ReadInputStream ris1=new ScarfReadInputStream(ff1);
153 			ReadInputStream ris2=(ff2==null ? null : new ScarfReadInputStream(ff2));
154 			cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads);
155 
156 		}else if(ff1.samOrBam()){
157 
158 			ReadInputStream ris1=new SamReadInputStream(ff1, keepSamHeader, FASTQ.FORCE_INTERLEAVED);
159 			ReadInputStream ris2=(ff2==null ? null : new SamReadInputStream(ff2, false, false));
160 			cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads);
161 
162 		}else if(ff1.bread()){
163 //			assert(false) : ff1;
164 			RTextInputStream rtis=new RTextInputStream(ff1, ff2, maxReads);
165 			cris=new ConcurrentLegacyReadInputStream(rtis, maxReads); //TODO: Change to generic
166 
167 		}else if(ff1.header()){
168 
169 			HeaderInputStream ris1=new HeaderInputStream(ff1);
170 			HeaderInputStream ris2=(ff2==null ? null : new HeaderInputStream(ff2));
171 			cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads);
172 
173 		}else if(ff1.sequential()){
174 
175 			SequentialReadInputStream ris=new SequentialReadInputStream(maxReads, 200, 50, 0, false);
176 			cris=new ConcurrentLegacyReadInputStream(ris, maxReads);
177 
178 		}else if(ff1.csfasta()){
179 
180 			throw new RuntimeException("csfasta is no longer supported.");
181 
182 		}else if(ff1.random()){
183 
184 			RandomReadInputStream3 ris=new RandomReadInputStream3(maxReads, FASTQ.FORCE_INTERLEAVED);
185 			cris=new ConcurrentGenericReadInputStream(ris, null, maxReads);
186 
187 		}else if(ff1.embl()){
188 
189 			EmblReadInputStream ris=new EmblReadInputStream(ff1);
190 			cris=new ConcurrentGenericReadInputStream(ris, null, maxReads);
191 
192 		}else if(ff1.gbk()){
193 
194 			GbkReadInputStream ris=new GbkReadInputStream(ff1);
195 			cris=new ConcurrentGenericReadInputStream(ris, null, maxReads);
196 
197 		}else{
198 			cris=null;
199 			throw new RuntimeException(""+ff1);
200 		}
201 
202 		return cris;
203 	}
204 
205 
206 	/*--------------------------------------------------------------*/
207 	/*----------------         Outer Methods        ----------------*/
208 	/*--------------------------------------------------------------*/
209 
getReads(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, String qf1, String qf2)210 	public static ArrayList<Read> getReads(long maxReads, boolean keepSamHeader,
211 			FileFormat ff1, FileFormat ff2, String qf1, String qf2){
212 		ConcurrentReadInputStream cris=getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2, Shared.USE_MPI, Shared.MPI_KEEP_ALL);
213 		cris.start();
214 		return cris.getReads();
215 	}
216 
getReads()217 	public ArrayList<Read> getReads(){
218 
219 		ListNum<Read> ln=nextList();
220 		ArrayList<Read> reads=(ln!=null ? ln.list : null);
221 
222 		ArrayList<Read> out=new ArrayList<Read>();
223 
224 		while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
225 			out.addAll(reads);
226 			returnList(ln.id, ln.list.isEmpty());
227 			ln=nextList();
228 			reads=(ln!=null ? ln.list : null);
229 		}
230 		if(ln!=null){
231 			returnList(ln.id, ln.list==null || ln.list.isEmpty());
232 		}
233 		boolean error=ReadWrite.closeStream(this);
234 		if(error){
235 			System.err.println("Warning - an error was encountered during read input.");
236 		}
237 		return out;
238 	}
239 
240 	@Override
start()241 	public void start(){
242 //		System.err.println("Starting "+this);
243 		new Thread(this).start(); //Prevents a strange deadlock in ConcurrentCollectionReadInputStream
244 		started=true;
245 	}
246 
started()247 	public final boolean started(){return started;}
248 
249 
250 	/*--------------------------------------------------------------*/
251 	/*----------------       Abstract Methods       ----------------*/
252 	/*--------------------------------------------------------------*/
253 
254 	@Override
nextList()255 	public abstract ListNum<Read> nextList();
256 
257 	@Override
returnList(ListNum<Read> ln)258 	public final void returnList(ListNum<Read> ln){
259 		if(ln!=null){returnList(ln.id, ln.isEmpty());}
260 	}
261 
262 	@Override
returnList(long listNum, boolean poison)263 	public abstract void returnList(long listNum, boolean poison);
264 
265 	@Override
run()266 	public abstract void run();
267 
268 	@Override
shutdown()269 	public abstract void shutdown();
270 
271 	@Override
restart()272 	public abstract void restart();
273 
274 	@Override
close()275 	public abstract void close();
276 
277 	@Override
paired()278 	public abstract boolean paired();
279 
280 	@Override
producers()281 	public abstract Object[] producers();
282 
283 	@Override
errorState()284 	public abstract boolean errorState();
285 
286 	@Override
setSampleRate(float rate, long seed)287 	public abstract void setSampleRate(float rate, long seed);
288 
289 	@Override
basesIn()290 	public abstract long basesIn();
291 
292 	@Override
readsIn()293 	public abstract long readsIn();
294 
295 	@Override
verbose()296 	public abstract boolean verbose();
297 
298 	/*--------------------------------------------------------------*/
299 	/*----------------            Fields            ----------------*/
300 	/*--------------------------------------------------------------*/
301 
302 	final int BUF_LEN=Shared.bufferLen();;
303 	final int NUM_BUFFS=Shared.numBuffers();
304 	final long MAX_DATA=Shared.bufferData();
305 	public boolean ALLOW_UNEQUAL_LENGTHS=false;
306 	boolean started=false;
307 
308 	/*--------------------------------------------------------------*/
309 	/*----------------         Static Fields        ----------------*/
310 	/*--------------------------------------------------------------*/
311 
312 	public static boolean SHOW_PROGRESS=false;
313 	public static boolean SHOW_PROGRESS2=false; //Indicate time in seconds between dots.
314 	public static long PROGRESS_INCR=1000000;
315 	public static boolean REMOVE_DISCARDED_READS=false;
316 
317 }
318