1 package pacbio;
2 
3 import java.io.File;
4 import java.util.ArrayList;
5 import java.util.HashMap;
6 
7 import align2.MultiStateAligner9PacBio;
8 import dna.AminoAcid;
9 import dna.ChromosomeArray;
10 import dna.Data;
11 import fileIO.ReadWrite;
12 import fileIO.TextFile;
13 import fileIO.TextStreamWriter;
14 import shared.Parse;
15 import shared.PreParser;
16 import shared.Shared;
17 import shared.Timer;
18 import shared.Tools;
19 import stream.ConcurrentLegacyReadInputStream;
20 import stream.RTextInputStream;
21 import stream.Read;
22 import stream.SiteScore;
23 import stream.SiteScoreR;
24 import structures.CoverageArray;
25 import structures.CoverageArray2;
26 import structures.ListNum;
27 
28 /**
29  * @author Brian Bushnell
30  * @date Jul 16, 2012
31  *
32  */
33 public class StackSites2 {
34 
main(String[] args)35 	public static void main(String[] args){
36 		{//Preparse block for help, config files, and outstream
37 			PreParser pp=new PreParser(args, new Object() { }.getClass().getEnclosingClass(), false);
38 			args=pp.args;
39 			//outstream=pp.outstream;
40 		}
41 
42 		Timer t=new Timer();
43 
44 		String tempname=null;
45 		Data.GENOME_BUILD=-1;
46 
47 		for(int i=4; i<args.length; i++){
48 			final String arg=args[i];
49 			final String[] split=arg.split("=");
50 			String a=split[0].toLowerCase();
51 			String b=split.length>1 ? split[1] : null;
52 
53 			if(a.equals("genome") || a.equals("build")){
54 				Data.setGenome(Integer.parseInt(b));
55 			}else if(a.equals("tempname")){
56 				tempname=b;
57 			}else if(a.equals("deletefiles") || a.startsWith("deletetemp") || a.equals("delete")){
58 				DELETE_TEMP=(Parse.parseBoolean(b));
59 			}else if(a.equals("blocksize")){
60 				BLOCKSIZE=(Integer.parseInt(b));
61 			}else{
62 				throw new RuntimeException("Unknown parameter "+args[i]);
63 			}
64 		}
65 
66 		if(Data.GENOME_BUILD<0){throw new RuntimeException("Please specify genome build.");}
67 
68 		stack(args[0], args[1], args[2], args[3], tempname);
69 		t.stop();
70 		System.out.println("Time: \t"+t);
71 	}
72 
stack(String fname1, String fname2, String outname, String pcovoutname, String tempname)73 	public static void stack(String fname1, String fname2, String outname, String pcovoutname, String tempname){
74 		assert(pcovoutname.contains("#"));
75 		final RTextInputStream rtis=new RTextInputStream(fname1, (fname2==null || fname2.equals("null") ? null : fname2), -1);
76 		final ConcurrentLegacyReadInputStream cris=new ConcurrentLegacyReadInputStream(rtis, -1);
77 
78 		cris.start();
79 		System.err.println("Started cris");
80 		final boolean paired=cris.paired();
81 		System.err.println("Paired: "+paired);
82 
83 		final ArrayList<CoverageArray> pcov;
84 		final ArrayList<CoverageArray> truePcov;
85 		final ArrayList<CoverageArray> cov;
86 
87 		{
88 			int len=(Data.GENOME_BUILD<0 ? 8 : Data.numChroms+1);
89 
90 			pcov=new ArrayList<CoverageArray>(len);
91 			truePcov=new ArrayList<CoverageArray>(len);
92 			cov=new ArrayList<CoverageArray>(len);
93 
94 			System.out.println("len="+len+"; Data.numChroms="+Data.numChroms);
95 
96 			pcov.add(null);
97 			truePcov.add(null);
98 			cov.add(null);
99 
100 			for(int i=1; i<len; i++){
101 				if(Data.GENOME_BUILD<0){
102 					pcov.add(new CoverageArray2(-1, 500));
103 					truePcov.add(new CoverageArray2(-1, 500));
104 					cov.add(new CoverageArray2(-1, 500));
105 				}else{
106 					pcov.add(new CoverageArray2(-1, Data.chromLengths[i]+1));
107 					truePcov.add(new CoverageArray2(-1, Data.chromLengths[i]+1));
108 					cov.add(new CoverageArray2(-1, Data.chromLengths[i]+1));
109 				}
110 			}
111 		}
112 
113 
114 		final Glob g=new Glob(tempname);
115 
116 		{
117 			ListNum<Read> ln=cris.nextList();
118 			ArrayList<Read> reads=(ln!=null ? ln.list : null);
119 
120 			if(reads!=null && !reads.isEmpty()){
121 				Read r=reads.get(0);
122 				assert(paired==(r.mate!=null));
123 			}
124 
125 			while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
126 				//System.err.println("reads.size()="+reads.size());
127 				for(Read r : reads){
128 					readsProcessed++;
129 
130 //					System.out.println("Processing read "+r.numericID);
131 
132 					if(r!=null){
133 						if(r.sites!=null){
134 //							System.out.println("Adding "+r.list.size()+" sites.");
135 							SiteScore original=r.originalSite;
136 							for(SiteScore ss : r.sites){
137 								sitesProcessed++;
138 
139 								//TODO: Process perfect coverage
140 								{
141 									boolean b=false;
142 									if(ss.semiperfect){
143 										b=true;
144 									}else{//Check for no-refs
145 										int len=ss.stop-ss.start+1;
146 										if(len==r.length() && ss.slowScore>=0.5f*MultiStateAligner9PacBio.POINTS_MATCH2){
147 											b=checkPerfection(ss.start, ss.stop, r.bases, Data.getChromosome(ss.chrom), ss.strand==Shared.MINUS, 0.5f);
148 										}
149 									}
150 									if(b){
151 										while(pcov.size()<=ss.chrom){
152 											pcov.add(new CoverageArray2(pcov.size(), Data.chromLengths[pcov.size()]));
153 											truePcov.add(new CoverageArray2(truePcov.size(), Data.chromLengths[truePcov.size()]));
154 										}
155 										CoverageArray ca=pcov.get(ss.chrom);
156 										CoverageArray tca=truePcov.get(ss.chrom);
157 										for(int i=ss.start+PCOV_TIP_DIST; i<=ss.stop-PCOV_TIP_DIST; i++){
158 											ca.increment(i);
159 										}
160 										if(ss.perfect){
161 											for(int i=ss.start; i<=ss.stop; i++){
162 												tca.increment(i);
163 											}
164 										}
165 									}
166 									{
167 										while(cov.size()<=ss.chrom){
168 											cov.add(new CoverageArray2(cov.size(), Data.chromLengths[cov.size()]));
169 										}
170 										CoverageArray ca=cov.get(ss.chrom);
171 										for(int i=ss.start; i<=ss.stop; i++){
172 											ca.increment(i);
173 										}
174 									}
175 								}
176 
177 								SiteScoreR ssr=new SiteScoreR(ss, r.length(), r.numericID, (byte)r.pairnum());
178 
179 								if(original!=null){
180 									ssr.correct=isCorrectHitLoose(ss, original.chrom, original.strand, original.start, original.stop, 40, false);
181 								}
182 
183 								g.write(ssr);
184 							}
185 //							System.out.println(sitesProcessed);
186 						}
187 					}
188 
189 					if(r.mate!=null){
190 						Read r2=r.mate;
191 						if(r2.sites!=null){
192 
193 							SiteScore original=r2.originalSite;
194 							for(SiteScore ss : r2.sites){
195 								sitesProcessed++;
196 
197 								{
198 									boolean b=false;
199 									if(ss.semiperfect){
200 										b=true;
201 									}else{//Check for no-refs
202 										int len=ss.stop-ss.start+1;
203 										if(len==r2.length() && ss.slowScore>=0.5f*MultiStateAligner9PacBio.POINTS_MATCH2){
204 											b=checkPerfection(ss.start, ss.stop, r2.bases, Data.getChromosome(ss.chrom), ss.strand==Shared.MINUS, 0.5f);
205 										}
206 									}
207 									if(b){
208 										while(pcov.size()<=ss.chrom){
209 											pcov.add(new CoverageArray2(pcov.size(), Data.chromLengths[pcov.size()]));
210 											truePcov.add(new CoverageArray2(truePcov.size(), Data.chromLengths[truePcov.size()]));
211 										}
212 										CoverageArray ca=pcov.get(ss.chrom);
213 										CoverageArray tca=truePcov.get(ss.chrom);
214 										for(int i=ss.start+PCOV_TIP_DIST; i<=ss.stop-PCOV_TIP_DIST; i++){
215 											ca.increment(i);
216 										}
217 										if(ss.perfect){
218 											for(int i=ss.start; i<=ss.stop; i++){
219 												tca.increment(i);
220 											}
221 										}
222 									}
223 									{
224 										while(cov.size()<=ss.chrom){
225 											cov.add(new CoverageArray2(cov.size(), Data.chromLengths[cov.size()]));
226 										}
227 										CoverageArray ca=cov.get(ss.chrom);
228 										for(int i=ss.start; i<=ss.stop; i++){
229 											ca.increment(i);
230 										}
231 									}
232 								}
233 
234 								SiteScoreR ssr=new SiteScoreR(ss, r2.length(), r2.numericID, (byte)r2.pairnum());
235 
236 								if(original!=null){
237 									ssr.correct=isCorrectHitLoose(ss, original.chrom, original.strand, original.start, original.stop, 40, false);
238 								}
239 
240 								g.write(ssr);
241 							}
242 						}
243 					}
244 
245 //					System.out.println(r.toString());
246 //					assert(r.list!=null);
247 //					assert(r.list.size()>0);
248 
249 				}
250 				//System.err.println("returning list");
251 				cris.returnList(ln);
252 				//System.err.println("fetching list");
253 				ln=cris.nextList();
254 				reads=(ln!=null ? ln.list : null);
255 			}
256 			System.out.println("Finished reading");
257 			cris.returnList(ln);
258 			System.out.println("Returned list");
259 			ReadWrite.closeStream(cris);
260 			System.out.println("Closed stream");
261 			System.out.println("Processed "+readsProcessed+" reads.");
262 			System.out.println("Processed "+sitesProcessed+" sites.");
263 		}
264 
265 
266 		for(int i=1; i<pcov.size(); i++){
267 			CoverageArray ca=pcov.get(i);
268 //			pcov.set(i, null);
269 			if(ca.maxIndex<.995*ca.arrayLength()){
270 				ca.resize(ca.maxIndex+1);
271 			}
272 			ReadWrite.writeObjectInThread(ca, pcovoutname.replaceFirst("#", ""+i), false);
273 		}
274 
275 		finish(g, outname, pcov, truePcov, cov);
276 		System.out.println("Retained  "+sitesOut+" sites.");
277 
278 	}
279 
280 	/** TODO - thread this by chrom */
finish(Glob g, String outname, ArrayList<CoverageArray> pcov, ArrayList<CoverageArray> truePcov, ArrayList<CoverageArray> cov)281 	private static final void finish(Glob g, String outname, ArrayList<CoverageArray> pcov, ArrayList<CoverageArray> truePcov, ArrayList<CoverageArray> cov){
282 
283 
284 		final TextStreamWriter out=new TextStreamWriter(outname, true, false, false);
285 		out.start();
286 		ArrayList<Long> keys=new ArrayList<Long>(g.wmap.size());
287 		keys.addAll(g.wmap.keySet());
288 		Shared.sort(keys);
289 		for(Long k : keys){
290 			TextStreamWriter tsw=g.wmap.get(k);
291 			tsw.poison();
292 		}
293 
294 
295 
296 		int chrom=0;
297 		int loc=INTERVAL;
298 		String tab="";
299 		StringBuilder sb=new StringBuilder(4000);
300 
301 		for(Long k : keys){
302 			TextStreamWriter tsw=g.wmap.get(k);
303 			String fname=Glob.fname(k, g.tempname);
304 			for(int i=0; i<50 && tsw.isAlive(); i++){
305 				try {
306 					tsw.join(20000);
307 				} catch (InterruptedException e) {
308 					// TODO Auto-generated catch block
309 					e.printStackTrace();
310 				}
311 				if(tsw.isAlive()){
312 					System.err.println("Waiting for tsw "+tsw.fname+" to finish...");
313 				}
314 			}
315 			if(tsw.isAlive()){
316 				System.err.println(tsw.getClass().getName()+" for "+fname+" refused to die after a long time.");
317 				assert(false);
318 			}
319 
320 			TextFile tf=new TextFile(fname, false);
321 			ArrayList<SiteScoreR> list=new ArrayList<SiteScoreR>(1000);
322 			for(String s=tf.nextLine(); s!=null; s=tf.nextLine()){
323 				SiteScoreR ssr=SiteScoreR.fromText(s);
324 
325 				assert(pcov.size()>=ssr.chrom) : ssr.chrom+", "+pcov.size()+", "+truePcov.size()+", "+cov.size();
326 				final int c=ssr.chrom;
327 				boolean retain=retainSite(ssr, (pcov.size()>c ? pcov.get(c) : FAKE), (truePcov.size()>c ? truePcov.get(c) : FAKE), (cov.size()>c ? cov.get(c) : null));
328 				if(retain){
329 					list.add(ssr);
330 					sitesOut++;
331 				}
332 			}
333 			tf.close();
334 			if(DELETE_TEMP){
335 				new File(fname).delete();
336 			}
337 			Shared.sort(list, SiteScoreR.PCOMP);
338 
339 			final int lim=list.size();
340 			for(int i=0; i<lim; i++){
341 				SiteScoreR ssr=list.get(i);
342 				list.set(i, null);
343 				if(ssr.chrom>chrom || ssr.start>=loc){
344 					if(sb.length()>0){//Purge to disk
345 						sb.append('\n');
346 						out.print(sb.toString());
347 						sb.setLength(0);
348 					}
349 					chrom=ssr.chrom;
350 					loc=ssr.start;
351 					loc=(loc-(loc%INTERVAL))+INTERVAL;
352 					assert(loc>ssr.start);
353 					assert(loc-ssr.start<=INTERVAL);
354 					assert(loc%INTERVAL==0);
355 					tab="";
356 				}
357 				sb.append(tab);
358 				sb.append(ssr.toText());
359 				tab="\t";
360 			}
361 
362 		}
363 
364 
365 		sb.append('\n');
366 		out.print(sb.toString());
367 		out.poisonAndWait();
368 	}
369 
retainSite(SiteScoreR ssr, CoverageArray pcov, CoverageArray tpcov, CoverageArray cov)370 	private static boolean retainSite(SiteScoreR ssr, CoverageArray pcov, CoverageArray tpcov, CoverageArray cov){
371 		if(ssr.semiperfect && !ssr.perfect){return true;} //For tip extension
372 		assert(cov!=null && cov!=FAKE) : (cov==FAKE)+", "+ssr.chrom;
373 
374 		if(!ssr.semiperfect){ //Typical flawed read
375 			assert(!ssr.perfect);
376 			boolean toss=true;
377 			if(pcov==null || tpcov==null){
378 				toss=false;
379 			}else{
380 				for(int j=ssr.start-PCOV_TIP_DIST; toss && j<=ssr.stop+PCOV_TIP_DIST; j++){
381 					toss=(pcov.get(j)>=MIN_PCOV_TO_TOSS && tpcov.get(j)>=MIN_PCOV_TO_TOSS);
382 				}
383 			}
384 			if(toss){
385 				for(int j=ssr.start; j<=ssr.stop; j++){cov.increment(j, -1);}
386 				return false;
387 			}
388 		}
389 
390 		boolean alwaysLowCov=true;
391 		boolean alwaysTooPerfect=true;
392 		boolean onlyPerfect=true;
393 
394 		for(int j=ssr.start; (alwaysLowCov || alwaysTooPerfect || onlyPerfect) && j<=ssr.stop; j++){
395 			int c=cov.get(j);
396 			int tp=tpcov.get(j);
397 
398 			alwaysLowCov=alwaysLowCov && c<MIN_COV_TO_RETAIN;
399 			alwaysTooPerfect=alwaysTooPerfect && c-tp<tp;
400 			onlyPerfect=onlyPerfect && tp>0;
401 		}
402 
403 		if(alwaysLowCov || (alwaysTooPerfect && !ssr.semiperfect) || onlyPerfect){
404 			if(!ssr.semiperfect){
405 				for(int j=ssr.start; j<=ssr.stop; j++){cov.increment(j, -1);}
406 			}
407 			return false;
408 		}
409 
410 		return true;
411 	}
412 
413 	private static boolean checkPerfection(int start, int stop, byte[] bases, ChromosomeArray cha, boolean rcomp, float f) {
414 
415 		int noref=0;
416 		if(rcomp){
417 			for(int i=0; i<bases.length; i++){
418 				byte a=AminoAcid.baseToComplementExtended[bases[bases.length-i-1]];
419 				byte b=cha.get(start+i);
420 				if(b=='N'){noref++;}
421 				else if(a!=b){return false;}
422 			}
423 		}else{
424 			for(int i=0; i<bases.length; i++){
425 				byte a=bases[i];
426 				byte b=cha.get(start+i);
427 				if(b=='N'){noref++;}
428 				else if(a!=b){return false;}
429 			}
430 		}
431 		return bases.length-noref>=f*bases.length;
432 	}
433 
434 	public static boolean isCorrectHitLoose(SiteScore ss, int trueChrom, byte trueStrand, int trueStart, int trueStop, int thresh, boolean useChrom){
435 		if((useChrom && ss.chrom!=trueChrom) || ss.strand!=trueStrand){return false;}
436 
437 		assert(ss.stop>ss.start) : ss.toText()+", "+trueStart+", "+trueStop;
438 		assert(trueStop>trueStart) : ss.toText()+", "+trueStart+", "+trueStop;
439 
440 		return (Tools.absdif(ss.start, trueStart)<=thresh || Tools.absdif(ss.stop, trueStop)<=thresh);
441 	}
442 
443 	private static class Glob{
444 
445 		public Glob(String tempPattern_){
446 			tempname=(tempPattern_ == null ? DEFAULT_TEMP_PATTERN : tempPattern_);
447 		}
448 
449 		public void write(SiteScoreR ssr){
450 			long key=key(ssr.chrom, ssr.start);
451 			TextStreamWriter tsw=wmap.get(key);
452 			if(tsw==null){
453 				String fname=fname(key, tempname);
454 				tsw=new TextStreamWriter(fname, true, false, false);
455 				tsw.start();
456 				wmap.put(key, tsw);
457 			}
458 			tsw.print(ssr.toText().append('\n'));
459 		}
460 
461 		protected static final long key(int chrom, int start){
462 			long k=((long)chrom<<32)+(Tools.max(start, 0))/BLOCKSIZE;
463 			return k;
464 		}
465 
466 		protected static final String fname(long key, String outname){
467 			if(outname==null){outname=DEFAULT_TEMP_PATTERN;}
468 			assert(outname.contains("#")) : outname;
469 			return outname.replace("#", "b"+Data.GENOME_BUILD+"_"+key);
470 		}
471 
472 		final HashMap<Long, TextStreamWriter> wmap=new HashMap<Long, TextStreamWriter>();
473 		final String tempname;
474 
475 	}
476 
477 	/** Sites will be written to files, each containing an index range of this size.
478 	 * Larger means fewer files, but more memory used when reading the files (at a later stage).
479 	 */
480 	public static int BLOCKSIZE=8000000;
481 
482 	/** Sites are grouped into intervals (by start location) and treated as an array of arrays.
483 	 * All sites in an interval are printed as one line of text. */
484 	public static final int INTERVAL=200;
485 	public static long readsProcessed=0;
486 	public static long sitesProcessed=0;
487 	public static long sitesOut=0;
488 	public static boolean DELETE_TEMP=true;
489 	public static final String DEFAULT_TEMP_PATTERN="StackSites2TempFile_#.txt.gz";
490 	/** Start incrementing coverage this far in from the site tips. */
491 	public static int PCOV_TIP_DIST=6;
492 
493 	/** Toss sites from areas with less than this coverage, since they can't be used to call vars */
494 	public static int MIN_COV_TO_RETAIN=2;
495 	/** Toss sites from areas with less than this coverage, since they can't be used to call vars */
496 	public static int MIN_PCOV_TO_TOSS=3;
497 
498 	private static final CoverageArray FAKE=new CoverageArray2(-1, 500);
499 }
500