1 package clump;
2 
3 import java.io.PrintStream;
4 import java.util.ArrayList;
5 import java.util.LinkedHashMap;
6 import java.util.LinkedList;
7 import java.util.Map.Entry;
8 import java.util.concurrent.atomic.AtomicInteger;
9 
10 import dna.AminoAcid;
11 import shared.Shared;
12 import shared.Tools;
13 import stream.Read;
14 import structures.LongM;
15 
16 /**
17  * A list of clumps, meaning a list of lists of reads.
18  * Allows adding reads by streaming and generating new clumps as needed.
19  * The input reads must be correctly ordered.
20  * @author Brian Bushnell
21  * @date Nov 9, 2015
22  *
23  */
24 public class ClumpList extends ArrayList<Clump> {
25 
ClumpList(int k_)26 	public ClumpList(int k_){
27 		this(null, k_, false);
28 	}
29 
30 	//TODO: Add flag to remove duplicates, and remove optical duplicates
ClumpList(ArrayList<Read> list, int k_, boolean makeSimpleConsensus_)31 	public ClumpList(ArrayList<Read> list, int k_, boolean makeSimpleConsensus_){
32 		k=k_;
33 		makeSimpleConsensus=makeSimpleConsensus_;
34 		if(list!=null){addReads(list);}
35 //		Shared.sort(this);//Does nothing since reads are already sorted by kmer.
36 	}
37 
addReads(ArrayList<Read> list)38 	public void addReads(ArrayList<Read> list){
39 		assert(list.getClass()!=Clump.class) : list.getClass();
40 		if(list.size()<100000 || Shared.threads()<2){addReadsST(list);}
41 		else{addReadsMT(list);}
42 
43 //		System.err.println(addedR+", "+addedC+", "+list.size());
44 	}
45 
reorderPaired()46 	public void reorderPaired(){//More efficient but requires unpair and repair, and paired reads.
47 		ArrayList<Clump> temp=new ArrayList<Clump>(size());
48 		for(Clump c : this){
49 			if(!c.added){
50 				temp.add(c);
51 				c.added=true;
52 				addFriends(c, temp, 0);
53 			}
54 		}
55 		assert(temp.size()==size());
56 		super.clear();
57 		this.addAll(temp);
58 	}
59 
addFriends(Clump c, ArrayList<Clump> temp, int depth)60 	private void addFriends(Clump c, ArrayList<Clump> temp, int depth){
61 		LinkedList<Clump> q=(depth<100 ? new LinkedList<Clump>() : null);
62 		for(Read r : c){
63 			Read r2=r.mate;
64 			if(r2!=null){
65 				ReadKey rk=(ReadKey)r2.obj;
66 				Clump c2=rk.clump;
67 				if(!c2.added){
68 					temp.add(c2);
69 					c2.added=true;
70 					if(q!=null){
71 						q.addFirst(c2);
72 //						addFriends(c2, temp, depth+1);//Not as good.
73 					}
74 				}
75 			}
76 		}
77 		while(q!=null && !q.isEmpty()){
78 			addFriends(q.pollLast(), temp, depth+1);
79 		}
80 	}
81 
reorder()82 	public void reorder(){//Less efficient but works with unpaired reads
83 		ArrayList<Clump> temp=new ArrayList<Clump>(size());
84 //		temp.addAll(this);
85 		LinkedHashMap<LongM, ArrayList<Clump>> map=map();
86 		LinkedList<Clump> q=new LinkedList<Clump>();
87 
88 		final LongM key=new LongM();
89 		for(Entry<LongM, ArrayList<Clump>> e : map.entrySet()){
90 			ArrayList<Clump> list=e.getValue();
91 			if(list.size()>1){
92 				Clump c0=list.get(0);
93 				if(!c0.added){
94 					temp.add(c0);
95 					c0.added=true;
96 				}
97 				for(int i=1; i<list.size(); i++){
98 					Clump c=list.get(i);
99 					if(!c.added){
100 						q.addFirst(c);
101 						c.added=true;
102 					}
103 				}
104 				while(!q.isEmpty()){
105 					Clump c=q.pollLast();
106 					temp.add(c);
107 					key.set(c.kmer);
108 					ArrayList<Clump> list2=map.get(key);
109 					if(list2.size()>1){
110 						for(int i=1; i<list2.size(); i++){
111 							Clump c2=list2.get(i);
112 							if(!c2.added){
113 								c2.added=true;
114 								q.addFirst(c2);
115 							}
116 						}
117 					}
118 				}
119 			}
120 		}
121 		for(Clump c : this){
122 			if(!c.added){
123 				c.added=true;
124 				temp.add(c);
125 			}
126 		}
127 		assert(temp.size()==size()) : temp.size()+", "+this.size();
128 		this.clear();
129 		this.addAll(temp);
130 	}
131 
addReadsMT(ArrayList<Read> list)132 	public void addReadsMT(ArrayList<Read> list){
133 		int threads=Tools.mid(2, Shared.threads()/2, 16);
134 //		assert(false) : threads;
135 		final ArrayList<ClumpThread> alct=new ArrayList<ClumpThread>(threads);
136 		int incr=((list.size()+threads-1)/threads);
137 		if(incr*threads<list.size()-1){incr++;}
138 		assert(threads*incr>=list.size()) : threads+", "+incr+", "+list.size();
139 		for(int i=0; i<threads; i++){
140 			alct.add(new ClumpThread(list, i*incr, (i+1)*incr, k, makeSimpleConsensus));
141 		}
142 
143 		if(verbose){outstream.println("Starting clump threads.");}
144 		for(ClumpThread ct : alct){ct.start();}
145 
146 		if(verbose){outstream.println("Waiting for threads.");}
147 		/* Wait for threads to die */
148 		for(ClumpThread ct : alct){
149 
150 			/* Wait for a thread to die */
151 			while(ct.getState()!=Thread.State.TERMINATED){
152 				try {
153 					ct.join();
154 				} catch (InterruptedException e) {
155 					e.printStackTrace();
156 				}
157 			}
158 			addAll(ct.storage);
159 			cAdded+=ct.cAddedT;
160 			rAdded+=ct.rAddedT;
161 		}
162 		assert(list.size()==rAdded) : list.size()+"!="+rAdded;
163 //		for(int i=0; i<list.size(); i++){assert(list.get(i).doscarded()) : i;}
164 	}
165 
166 //	long addedR=0, addedC=0;
167 //
168 //	@Override
169 //	public boolean add(Clump c){
170 //		assert(c!=null && c.size()>0);
171 //		synchronized(this){
172 //			addedR+=c.size();
173 //			addedC++;
174 //		}
175 //		return super.add(c);
176 //	}
177 //
178 //	@Override
179 //	public boolean addAll(Collection<? extends Clump> cc){
180 //		for(Clump c : cc){add(c);}
181 //		return true;
182 //	}
183 
addReadsST(ArrayList<Read> list)184 	public void addReadsST(ArrayList<Read> list){
185 		Clump currentClump=null;
186 		long currentKmer=-1;
187 		for(final Read r : list){
188 			final ReadKey key=(ReadKey)r.obj;
189 			if(key.kmer!=currentKmer){
190 				if(currentClump!=null){
191 					if(makeSimpleConsensus){
192 						currentClump.consensusRead();
193 						currentClump.clearCounts();
194 					}
195 					add(currentClump);
196 				}
197 				currentKmer=key.kmer;
198 				currentClump=Clump.makeClump(key.kmer);
199 			}
200 			currentClump.add(r);
201 //			assert(!r.doscarded());
202 //			r.setDoscarded(true);
203 //			assert(r.doscarded());
204 		}
205 		if(currentClump!=null && !currentClump.isEmpty()){
206 			if(makeSimpleConsensus){
207 				currentClump.consensusRead();
208 				currentClump.clearCounts();
209 			}
210 			add(currentClump);
211 		}
212 //		for(int i=0; i<list.size(); i++){assert(list.get(i).doscarded()) : i;}
213 	}
214 
process(final int threads, final int mode, final long[] rvector)215 	public ArrayList<Read> process(final int threads, final int mode, final long[] rvector){
216 		final ArrayList<ProcessThread> alct=new ArrayList<ProcessThread>(threads);
217 		for(int i=0; i<threads; i++){alct.add(new ProcessThread(mode));}
218 
219 		if(verbose){outstream.println("Starting condense/correction threads.");}
220 		for(ProcessThread ct : alct){ct.start();}
221 
222 		if(verbose){outstream.println("Waiting for threads.");}
223 		long readsThisPass=0;
224 		long corrections=0;
225 		long duplicates=0;
226 		/* Wait for threads to die */
227 		for(ProcessThread ct : alct){
228 
229 			/* Wait for a thread to die */
230 			while(ct.getState()!=Thread.State.TERMINATED){
231 				try {
232 					ct.join();
233 				} catch (InterruptedException e) {
234 					e.printStackTrace();
235 				}
236 			}
237 			readsThisPass+=ct.storage.size();
238 			corrections+=ct.corrections;
239 			duplicates+=ct.duplicates;
240 		}
241 
242 		if(verbose){outstream.println("Gathering reads.");}
243 		ArrayList<Read> list=new ArrayList<Read>((int)readsThisPass);
244 		for(int i=0; i<threads; i++){
245 			ProcessThread ct=alct.set(i, null);
246 			list.addAll(ct.storage);
247 		}
248 
249 		rvector[0]+=corrections;
250 		rvector[1]+=duplicates;
251 //		assert(false) : duplicates+", "+Arrays.toString(rvector);
252 		assert(list.size()==readsThisPass);
253 		return list;
254 	}
255 
256 	@Override
clear()257 	public void clear(){
258 		super.clear();
259 		ptr.set(0);
260 		map=null;
261 	}
262 
map()263 	public LinkedHashMap<LongM, ArrayList<Clump>> map(){
264 		LinkedHashMap<LongM, ArrayList<Clump>> temp=map;
265 		if(temp==null){
266 			synchronized(this){
267 				if(map==null){
268 					temp=makeMap();
269 				}
270 				assert(map==null);
271 				map=temp;
272 			}
273 		}
274 		return map;
275 	}
276 
makeMap()277 	private synchronized LinkedHashMap<LongM, ArrayList<Clump>> makeMap(){
278 		assert(this.map==null);
279 		LinkedHashMap<LongM, ArrayList<Clump>> map=new LinkedHashMap<LongM, ArrayList<Clump>>(this.size());
280 		for(Clump c : this){
281 			ArrayList<Clump> list=new ArrayList<Clump>(4);
282 			list.add(c);
283 			map.put(new LongM(c.kmer), list);
284 		}
285 		fillMap(map);
286 		assert(this.map==null);
287 		return map;
288 	}
289 
fillMap(LinkedHashMap<LongM, ArrayList<Clump>> map)290 	private void fillMap(LinkedHashMap<LongM, ArrayList<Clump>> map){
291 		final int shift=2*k;
292 		final int shift2=shift-2;
293 		final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
294 		final LongM key=new LongM();
295 
296 		for(Clump c : this){
297 			Read r=c.consensusRead();
298 			byte[] bases=r.bases;
299 			long kmer=0, rkmer=0;
300 			int len=0;
301 			for(int i=0; i<bases.length; i++){
302 				byte b=bases[i];
303 				long x=AminoAcid.baseToNumber[b];
304 				long x2=AminoAcid.baseToComplementNumber[b];
305 				kmer=((kmer<<2)|x)&mask;
306 				rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
307 
308 				if(x<0){
309 					len=0;
310 					kmer=rkmer=0;
311 				}else{len++;}
312 
313 				if(len>=k){
314 					final long kmax=Tools.max(kmer, rkmer);
315 					key.set(kmax);
316 					if(kmax!=c.kmer){
317 						//This assertion should be fine, but it fails on nt.
318 //						assert(kmer!=c.kmer && rkmer!=c.kmer) : kmax+", "+kmer+", "+rkmer+", "+c.kmer+", "+r.length()+", "+(r.length()<10000 ? r.toString() : "");
319 					}
320 					ArrayList<Clump> list=map.get(key);
321 					if(list!=null && !list.contains(c)){list.add(c);}
322 				}
323 			}
324 		}
325 	}
326 
327 	private class ProcessThread extends Thread{
328 
ProcessThread(int mode_)329 		public ProcessThread(int mode_){
330 			mode=mode_;
331 		}
332 
333 		@Override
run()334 		public void run(){
335 			final int size=size();
336 			for(int i=ptr.getAndIncrement(); i<size; i=ptr.getAndIncrement()){
337 				Clump c=get(i);
338 				if(mode==CONDENSE){
339 					ArrayList<Read> list=c.makeConsensus();
340 					storage.addAll(list);
341 				}else if(mode==CORRECT){
342 					corrections+=c.splitAndErrorCorrect();
343 					if(UNRCOMP){
344 						for(Read r : c){
345 							if(r.swapped()){
346 								ReadKey key=(ReadKey)r.obj;
347 								key.flip(r, k);
348 //								r.reverseComplement();
349 //								r.setSwapped(false);
350 //								key.position=r.length()-key.position+k-2;
351 							}
352 						}
353 					}
354 					storage.addAll(c);
355 				}else if(mode==DEDUPE){
356 					duplicates+=c.removeDuplicates();
357 					storage.addAll(c);
358 				}else{
359 					throw new RuntimeException("Unknown mode "+mode);
360 				}
361 				c.clear();
362 				set(i, null);
363 			}
364 		}
365 
366 		public long corrections=0;
367 		public long duplicates=0;
368 		ArrayList<Read> storage=new ArrayList<Read>();
369 		private final int mode;
370 	}
371 
372 	private static class ClumpThread extends Thread{
373 
ClumpThread(ArrayList<Read> input_, int startIndex_, int stopIndex_, int k, boolean makeSimpleConsensus_)374 		public ClumpThread(ArrayList<Read> input_, int startIndex_, int stopIndex_, int k, boolean makeSimpleConsensus_){
375 			input=input_;
376 			startIndex=startIndex_;
377 			stopIndex=Tools.min(input.size(), stopIndex_);
378 //			outstream.println("Processing "+startIndex_+"-"+stopIndex_);
379 			storage=new ClumpList(k);
380 			makeSimpleConsensusT=makeSimpleConsensus_;
381 		}
382 
383 		@Override
run()384 		public void run(){
385 			if(startIndex>=input.size()){return;}
386 			int i=startIndex;
387 
388 			long currentKmer=-1;
389 			if(startIndex>0){
390 				Read r=input.get(i-1);
391 				final ReadKey key=(ReadKey)r.obj;
392 				currentKmer=key.kmer;
393 			}
394 			while(i<stopIndex){
395 				Read r=input.get(i);
396 				final ReadKey key=(ReadKey)r.obj;
397 				if(key.kmer!=currentKmer){
398 					if(currentClump!=null){
399 						storage.add(currentClump);
400 						cAddedT++;
401 						rAddedT+=currentClump.size();
402 					}
403 					currentKmer=key.kmer;
404 					currentClump=Clump.makeClump(key.kmer);
405 				}
406 				if(currentClump!=null){
407 					currentClump.add(r);
408 //					assert(!r.doscarded()) : +i+", "+startIndex;
409 //					r.setDoscarded(true);
410 				}
411 //				if(i==0){assert(r.doscarded()) : currentClump;}
412 				i++;
413 			}
414 			while(currentClump!=null && i<input.size()){
415 				Read r=input.get(i);
416 				final ReadKey key=(ReadKey)r.obj;
417 				if(key.kmer!=currentKmer){
418 					storage.add(currentClump);
419 					cAddedT++;
420 					rAddedT+=currentClump.size();
421 					currentClump=null;
422 					break;
423 				}else{
424 					currentClump.add(r);
425 //					assert(!r.doscarded()) : +i+", "+startIndex;
426 //					r.setDoscarded(true);
427 				}
428 				i++;
429 			}
430 			if(currentClump!=null && currentClump.size()>0){
431 				cAddedT++;
432 				rAddedT+=currentClump.size();
433 				storage.add(currentClump);
434 			}
435 			if(makeSimpleConsensusT){
436 				for(Clump c : storage){
437 					c.consensusRead();
438 					c.clearCounts();
439 				}
440 			}
441 			currentClump=null;
442 		}
443 
444 		final boolean makeSimpleConsensusT;
445 		final ArrayList<Read> input;
446 		final int startIndex;
447 		final int stopIndex;
448 		ClumpList storage;
449 		Clump currentClump;
450 		long cAddedT=0, rAddedT=0;
451 	}
452 
453 	static final int CONDENSE=1, CORRECT=2, DEDUPE=3;
454 
455 	private final boolean makeSimpleConsensus;
456 	long cAdded=0, rAdded=0;
457 
458 	final int k;
459 	final AtomicInteger ptr=new AtomicInteger(0);
460 	private LinkedHashMap<LongM, ArrayList<Clump>> map;
461 
462 	public static boolean UNRCOMP=true;
463 	private static boolean verbose=false;
464 
465 	private static final long serialVersionUID = 1L;
466 	private static final PrintStream outstream=System.err;
467 
468 }
469