1 package stream; 2 3 import java.util.ArrayList; 4 import java.util.Arrays; 5 6 import fileIO.FileFormat; 7 import fileIO.ReadWrite; 8 import shared.Shared; 9 import structures.ListNum; 10 11 /** 12 * Abstract superclass of all ConcurrentReadStreamInterface implementations. 13 * ConcurrentReadInputStreams allow paired reads from twin files to be treated as a single stream. 14 * @author Brian Bushnell 15 * @date Nov 26, 2014 16 * 17 */ 18 public abstract class ConcurrentReadInputStream implements ConcurrentReadStreamInterface { 19 20 /*--------------------------------------------------------------*/ 21 /*---------------- Initialization ----------------*/ 22 /*--------------------------------------------------------------*/ 23 ConcurrentReadInputStream()24 protected ConcurrentReadInputStream(){} 25 26 /** 27 * Special method for testing, handles some parsing. 28 * Used by MultiCros. 29 * Not really necessary; safe to remove. 30 */ getReadInputStream(long maxReads, boolean keepSamHeader, boolean allowSubprocess, String...args)31 protected static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, boolean allowSubprocess, String...args){ 32 assert(args.length>0) : Arrays.toString(args); 33 for(int i=0; i<args.length; i++){ 34 if("null".equalsIgnoreCase(args[i])){args[i]=null;} 35 } 36 assert(args[0]!=null) : Arrays.toString(args); 37 38 assert(args.length<2 || !args[0].equalsIgnoreCase(args[1])); 39 String in1=args[0], in2=null, qf1=null, qf2=null; 40 if(args.length>1){in2=args[1];} 41 if(args.length>2){qf1=args[2];} 42 if(args.length>3){qf2=args[3];} 43 44 final FileFormat ff1=FileFormat.testInput(in1, null, allowSubprocess); 45 final FileFormat ff2=FileFormat.testInput(in2, null, allowSubprocess); 46 47 return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2); 48 } 49 50 /** @See primary method */ getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2)51 public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2){ 52 return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, (String)null, (String)null, Shared.USE_MPI, Shared.MPI_KEEP_ALL); 53 } 54 55 /** @See primary method */ getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, final boolean mpi, final boolean keepAll)56 public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, 57 final boolean mpi, final boolean keepAll){ 58 return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, (String)null, (String)null, mpi, keepAll); 59 } 60 61 // /** @See primary method */ 62 // public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, String qf1){ 63 // return getReadInputStream(maxReads, keepSamHeader, ff1, null, qf1, null, Shared.USE_MPI, Shared.MPI_KEEP_ALL); 64 // } 65 66 /** @See primary method */ getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, String qf1, String qf2)67 public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, 68 FileFormat ff1, FileFormat ff2, String qf1, String qf2){ 69 return getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2, Shared.USE_MPI, Shared.MPI_KEEP_ALL); 70 } 71 72 /** 73 * @param maxReads Quit producing after this many reads (or pairs) 74 * @param keepSamHeader If the input is sam, store the header in the static shared header object 75 * @param ff1 Read 1 file (required) 76 * @param ff2 Read 2 file (optional) 77 * @param qf1 Qual file 1 (optional) 78 * @param qf2 Qual file 2 (optional) 79 * @param mpi True if MPI will be used 80 * @param keepAll In MPI mode, tells this stream to keep all reads instead of just a fraction 81 * @return 82 */ getReadInputStream(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, String qf1, String qf2, final boolean mpi, final boolean keepAll)83 public static ConcurrentReadInputStream getReadInputStream(long maxReads, boolean keepSamHeader, 84 FileFormat ff1, FileFormat ff2, String qf1, String qf2, final boolean mpi, final boolean keepAll){ 85 if(mpi){ 86 final int rank=Shared.MPI_RANK; 87 final ConcurrentReadInputStream cris0; 88 if(rank==0){ 89 cris0=getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2, false, true); 90 cris0.start(); 91 }else{ 92 cris0=null; 93 } 94 final ConcurrentReadInputStream crisD; 95 if(Shared.USE_CRISMPI){ 96 assert(false) : "To support MPI, uncomment this."; 97 // crisD=new ConcurrentReadInputStreamMPI(cris0, rank==0, keepAll); 98 crisD=null; 99 }else{ 100 crisD=new ConcurrentReadInputStreamD(cris0, rank==0, keepAll); 101 } 102 return crisD; 103 } 104 105 assert(ff1!=null); 106 assert(ff2==null || ff1.name()==null || !ff1.name().equalsIgnoreCase(ff2.name())) : ff1.name()+", "+ff2.name(); 107 assert(qf1==null || ff1.name()==null || !ff1.name().equalsIgnoreCase(qf2)); 108 assert(qf1==null || qf2==null || qf1.equalsIgnoreCase(qf2)); 109 110 final ConcurrentReadInputStream cris; 111 112 if(ff1.fastq()){ 113 114 ReadInputStream ris1, ris2; 115 116 ris1=new FastqReadInputStream(ff1); 117 try { 118 ris2=(ff2==null ? null : new FastqReadInputStream(ff2)); 119 } catch (AssertionError e) {//Handles problems with quality score autodetection 120 ris1.close(); 121 throw e; 122 } 123 cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads); 124 125 }else if(ff1.oneline()){ 126 127 ReadInputStream ris1=new OnelineReadInputStream(ff1); 128 ReadInputStream ris2=(ff2==null ? null : new OnelineReadInputStream(ff2)); 129 cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads); 130 131 }else if(ff1.fasta()){ 132 133 ReadInputStream ris1; 134 ReadInputStream ris2; 135 if(ff1.preferShreds()){ 136 ris1=new FastaShredInputStream(ff1, Shared.AMINO_IN, ff2==null ? Shared.bufferData() : -1); 137 ris2=(ff2==null ? null : new FastaShredInputStream(ff2, Shared.AMINO_IN, -1)); 138 }else{ 139 ris1=(qf1==null ? new FastaReadInputStream(ff1, (FASTQ.FORCE_INTERLEAVED && ff2==null), Shared.AMINO_IN, ff2==null ? Shared.bufferData() : -1) 140 : new FastaQualReadInputStream(ff1, qf1)); 141 ris2=(ff2==null ? null : qf2==null ? new FastaReadInputStream(ff2, false, Shared.AMINO_IN, -1) : new FastaQualReadInputStream(ff2, qf2)); 142 } 143 cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads); 144 145 // cris.start(); 146 // ListNum<Read> ln=cris.nextList(); 147 // System.out.println(ln); 148 // 149 // assert(false) : ff1+", "+ff2; 150 }else if(ff1.scarf()){ 151 152 ReadInputStream ris1=new ScarfReadInputStream(ff1); 153 ReadInputStream ris2=(ff2==null ? null : new ScarfReadInputStream(ff2)); 154 cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads); 155 156 }else if(ff1.samOrBam()){ 157 158 ReadInputStream ris1=new SamReadInputStream(ff1, keepSamHeader, FASTQ.FORCE_INTERLEAVED); 159 ReadInputStream ris2=(ff2==null ? null : new SamReadInputStream(ff2, false, false)); 160 cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads); 161 162 }else if(ff1.bread()){ 163 // assert(false) : ff1; 164 RTextInputStream rtis=new RTextInputStream(ff1, ff2, maxReads); 165 cris=new ConcurrentLegacyReadInputStream(rtis, maxReads); //TODO: Change to generic 166 167 }else if(ff1.header()){ 168 169 HeaderInputStream ris1=new HeaderInputStream(ff1); 170 HeaderInputStream ris2=(ff2==null ? null : new HeaderInputStream(ff2)); 171 cris=new ConcurrentGenericReadInputStream(ris1, ris2, maxReads); 172 173 }else if(ff1.sequential()){ 174 175 SequentialReadInputStream ris=new SequentialReadInputStream(maxReads, 200, 50, 0, false); 176 cris=new ConcurrentLegacyReadInputStream(ris, maxReads); 177 178 }else if(ff1.csfasta()){ 179 180 throw new RuntimeException("csfasta is no longer supported."); 181 182 }else if(ff1.random()){ 183 184 RandomReadInputStream3 ris=new RandomReadInputStream3(maxReads, FASTQ.FORCE_INTERLEAVED); 185 cris=new ConcurrentGenericReadInputStream(ris, null, maxReads); 186 187 }else if(ff1.embl()){ 188 189 EmblReadInputStream ris=new EmblReadInputStream(ff1); 190 cris=new ConcurrentGenericReadInputStream(ris, null, maxReads); 191 192 }else if(ff1.gbk()){ 193 194 GbkReadInputStream ris=new GbkReadInputStream(ff1); 195 cris=new ConcurrentGenericReadInputStream(ris, null, maxReads); 196 197 }else{ 198 cris=null; 199 throw new RuntimeException(""+ff1); 200 } 201 202 return cris; 203 } 204 205 206 /*--------------------------------------------------------------*/ 207 /*---------------- Outer Methods ----------------*/ 208 /*--------------------------------------------------------------*/ 209 getReads(long maxReads, boolean keepSamHeader, FileFormat ff1, FileFormat ff2, String qf1, String qf2)210 public static ArrayList<Read> getReads(long maxReads, boolean keepSamHeader, 211 FileFormat ff1, FileFormat ff2, String qf1, String qf2){ 212 ConcurrentReadInputStream cris=getReadInputStream(maxReads, keepSamHeader, ff1, ff2, qf1, qf2, Shared.USE_MPI, Shared.MPI_KEEP_ALL); 213 cris.start(); 214 return cris.getReads(); 215 } 216 getReads()217 public ArrayList<Read> getReads(){ 218 219 ListNum<Read> ln=nextList(); 220 ArrayList<Read> reads=(ln!=null ? ln.list : null); 221 222 ArrayList<Read> out=new ArrayList<Read>(); 223 224 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 225 out.addAll(reads); 226 returnList(ln.id, ln.list.isEmpty()); 227 ln=nextList(); 228 reads=(ln!=null ? ln.list : null); 229 } 230 if(ln!=null){ 231 returnList(ln.id, ln.list==null || ln.list.isEmpty()); 232 } 233 boolean error=ReadWrite.closeStream(this); 234 if(error){ 235 System.err.println("Warning - an error was encountered during read input."); 236 } 237 return out; 238 } 239 240 @Override start()241 public void start(){ 242 // System.err.println("Starting "+this); 243 new Thread(this).start(); //Prevents a strange deadlock in ConcurrentCollectionReadInputStream 244 started=true; 245 } 246 started()247 public final boolean started(){return started;} 248 249 250 /*--------------------------------------------------------------*/ 251 /*---------------- Abstract Methods ----------------*/ 252 /*--------------------------------------------------------------*/ 253 254 @Override nextList()255 public abstract ListNum<Read> nextList(); 256 257 @Override returnList(ListNum<Read> ln)258 public final void returnList(ListNum<Read> ln){ 259 if(ln!=null){returnList(ln.id, ln.isEmpty());} 260 } 261 262 @Override returnList(long listNum, boolean poison)263 public abstract void returnList(long listNum, boolean poison); 264 265 @Override run()266 public abstract void run(); 267 268 @Override shutdown()269 public abstract void shutdown(); 270 271 @Override restart()272 public abstract void restart(); 273 274 @Override close()275 public abstract void close(); 276 277 @Override paired()278 public abstract boolean paired(); 279 280 @Override producers()281 public abstract Object[] producers(); 282 283 @Override errorState()284 public abstract boolean errorState(); 285 286 @Override setSampleRate(float rate, long seed)287 public abstract void setSampleRate(float rate, long seed); 288 289 @Override basesIn()290 public abstract long basesIn(); 291 292 @Override readsIn()293 public abstract long readsIn(); 294 295 @Override verbose()296 public abstract boolean verbose(); 297 298 /*--------------------------------------------------------------*/ 299 /*---------------- Fields ----------------*/ 300 /*--------------------------------------------------------------*/ 301 302 final int BUF_LEN=Shared.bufferLen();; 303 final int NUM_BUFFS=Shared.numBuffers(); 304 final long MAX_DATA=Shared.bufferData(); 305 public boolean ALLOW_UNEQUAL_LENGTHS=false; 306 boolean started=false; 307 308 /*--------------------------------------------------------------*/ 309 /*---------------- Static Fields ----------------*/ 310 /*--------------------------------------------------------------*/ 311 312 public static boolean SHOW_PROGRESS=false; 313 public static boolean SHOW_PROGRESS2=false; //Indicate time in seconds between dots. 314 public static long PROGRESS_INCR=1000000; 315 public static boolean REMOVE_DISCARDED_READS=false; 316 317 } 318