1 package clump; 2 3 import java.io.PrintStream; 4 import java.util.ArrayList; 5 import java.util.LinkedHashMap; 6 import java.util.LinkedList; 7 import java.util.Map.Entry; 8 import java.util.concurrent.atomic.AtomicInteger; 9 10 import dna.AminoAcid; 11 import shared.Shared; 12 import shared.Tools; 13 import stream.Read; 14 import structures.LongM; 15 16 /** 17 * A list of clumps, meaning a list of lists of reads. 18 * Allows adding reads by streaming and generating new clumps as needed. 19 * The input reads must be correctly ordered. 20 * @author Brian Bushnell 21 * @date Nov 9, 2015 22 * 23 */ 24 public class ClumpList extends ArrayList<Clump> { 25 ClumpList(int k_)26 public ClumpList(int k_){ 27 this(null, k_, false); 28 } 29 30 //TODO: Add flag to remove duplicates, and remove optical duplicates ClumpList(ArrayList<Read> list, int k_, boolean makeSimpleConsensus_)31 public ClumpList(ArrayList<Read> list, int k_, boolean makeSimpleConsensus_){ 32 k=k_; 33 makeSimpleConsensus=makeSimpleConsensus_; 34 if(list!=null){addReads(list);} 35 // Shared.sort(this);//Does nothing since reads are already sorted by kmer. 36 } 37 addReads(ArrayList<Read> list)38 public void addReads(ArrayList<Read> list){ 39 assert(list.getClass()!=Clump.class) : list.getClass(); 40 if(list.size()<100000 || Shared.threads()<2){addReadsST(list);} 41 else{addReadsMT(list);} 42 43 // System.err.println(addedR+", "+addedC+", "+list.size()); 44 } 45 reorderPaired()46 public void reorderPaired(){//More efficient but requires unpair and repair, and paired reads. 47 ArrayList<Clump> temp=new ArrayList<Clump>(size()); 48 for(Clump c : this){ 49 if(!c.added){ 50 temp.add(c); 51 c.added=true; 52 addFriends(c, temp, 0); 53 } 54 } 55 assert(temp.size()==size()); 56 super.clear(); 57 this.addAll(temp); 58 } 59 addFriends(Clump c, ArrayList<Clump> temp, int depth)60 private void addFriends(Clump c, ArrayList<Clump> temp, int depth){ 61 LinkedList<Clump> q=(depth<100 ? new LinkedList<Clump>() : null); 62 for(Read r : c){ 63 Read r2=r.mate; 64 if(r2!=null){ 65 ReadKey rk=(ReadKey)r2.obj; 66 Clump c2=rk.clump; 67 if(!c2.added){ 68 temp.add(c2); 69 c2.added=true; 70 if(q!=null){ 71 q.addFirst(c2); 72 // addFriends(c2, temp, depth+1);//Not as good. 73 } 74 } 75 } 76 } 77 while(q!=null && !q.isEmpty()){ 78 addFriends(q.pollLast(), temp, depth+1); 79 } 80 } 81 reorder()82 public void reorder(){//Less efficient but works with unpaired reads 83 ArrayList<Clump> temp=new ArrayList<Clump>(size()); 84 // temp.addAll(this); 85 LinkedHashMap<LongM, ArrayList<Clump>> map=map(); 86 LinkedList<Clump> q=new LinkedList<Clump>(); 87 88 final LongM key=new LongM(); 89 for(Entry<LongM, ArrayList<Clump>> e : map.entrySet()){ 90 ArrayList<Clump> list=e.getValue(); 91 if(list.size()>1){ 92 Clump c0=list.get(0); 93 if(!c0.added){ 94 temp.add(c0); 95 c0.added=true; 96 } 97 for(int i=1; i<list.size(); i++){ 98 Clump c=list.get(i); 99 if(!c.added){ 100 q.addFirst(c); 101 c.added=true; 102 } 103 } 104 while(!q.isEmpty()){ 105 Clump c=q.pollLast(); 106 temp.add(c); 107 key.set(c.kmer); 108 ArrayList<Clump> list2=map.get(key); 109 if(list2.size()>1){ 110 for(int i=1; i<list2.size(); i++){ 111 Clump c2=list2.get(i); 112 if(!c2.added){ 113 c2.added=true; 114 q.addFirst(c2); 115 } 116 } 117 } 118 } 119 } 120 } 121 for(Clump c : this){ 122 if(!c.added){ 123 c.added=true; 124 temp.add(c); 125 } 126 } 127 assert(temp.size()==size()) : temp.size()+", "+this.size(); 128 this.clear(); 129 this.addAll(temp); 130 } 131 addReadsMT(ArrayList<Read> list)132 public void addReadsMT(ArrayList<Read> list){ 133 int threads=Tools.mid(2, Shared.threads()/2, 16); 134 // assert(false) : threads; 135 final ArrayList<ClumpThread> alct=new ArrayList<ClumpThread>(threads); 136 int incr=((list.size()+threads-1)/threads); 137 if(incr*threads<list.size()-1){incr++;} 138 assert(threads*incr>=list.size()) : threads+", "+incr+", "+list.size(); 139 for(int i=0; i<threads; i++){ 140 alct.add(new ClumpThread(list, i*incr, (i+1)*incr, k, makeSimpleConsensus)); 141 } 142 143 if(verbose){outstream.println("Starting clump threads.");} 144 for(ClumpThread ct : alct){ct.start();} 145 146 if(verbose){outstream.println("Waiting for threads.");} 147 /* Wait for threads to die */ 148 for(ClumpThread ct : alct){ 149 150 /* Wait for a thread to die */ 151 while(ct.getState()!=Thread.State.TERMINATED){ 152 try { 153 ct.join(); 154 } catch (InterruptedException e) { 155 e.printStackTrace(); 156 } 157 } 158 addAll(ct.storage); 159 cAdded+=ct.cAddedT; 160 rAdded+=ct.rAddedT; 161 } 162 assert(list.size()==rAdded) : list.size()+"!="+rAdded; 163 // for(int i=0; i<list.size(); i++){assert(list.get(i).doscarded()) : i;} 164 } 165 166 // long addedR=0, addedC=0; 167 // 168 // @Override 169 // public boolean add(Clump c){ 170 // assert(c!=null && c.size()>0); 171 // synchronized(this){ 172 // addedR+=c.size(); 173 // addedC++; 174 // } 175 // return super.add(c); 176 // } 177 // 178 // @Override 179 // public boolean addAll(Collection<? extends Clump> cc){ 180 // for(Clump c : cc){add(c);} 181 // return true; 182 // } 183 addReadsST(ArrayList<Read> list)184 public void addReadsST(ArrayList<Read> list){ 185 Clump currentClump=null; 186 long currentKmer=-1; 187 for(final Read r : list){ 188 final ReadKey key=(ReadKey)r.obj; 189 if(key.kmer!=currentKmer){ 190 if(currentClump!=null){ 191 if(makeSimpleConsensus){ 192 currentClump.consensusRead(); 193 currentClump.clearCounts(); 194 } 195 add(currentClump); 196 } 197 currentKmer=key.kmer; 198 currentClump=Clump.makeClump(key.kmer); 199 } 200 currentClump.add(r); 201 // assert(!r.doscarded()); 202 // r.setDoscarded(true); 203 // assert(r.doscarded()); 204 } 205 if(currentClump!=null && !currentClump.isEmpty()){ 206 if(makeSimpleConsensus){ 207 currentClump.consensusRead(); 208 currentClump.clearCounts(); 209 } 210 add(currentClump); 211 } 212 // for(int i=0; i<list.size(); i++){assert(list.get(i).doscarded()) : i;} 213 } 214 process(final int threads, final int mode, final long[] rvector)215 public ArrayList<Read> process(final int threads, final int mode, final long[] rvector){ 216 final ArrayList<ProcessThread> alct=new ArrayList<ProcessThread>(threads); 217 for(int i=0; i<threads; i++){alct.add(new ProcessThread(mode));} 218 219 if(verbose){outstream.println("Starting condense/correction threads.");} 220 for(ProcessThread ct : alct){ct.start();} 221 222 if(verbose){outstream.println("Waiting for threads.");} 223 long readsThisPass=0; 224 long corrections=0; 225 long duplicates=0; 226 /* Wait for threads to die */ 227 for(ProcessThread ct : alct){ 228 229 /* Wait for a thread to die */ 230 while(ct.getState()!=Thread.State.TERMINATED){ 231 try { 232 ct.join(); 233 } catch (InterruptedException e) { 234 e.printStackTrace(); 235 } 236 } 237 readsThisPass+=ct.storage.size(); 238 corrections+=ct.corrections; 239 duplicates+=ct.duplicates; 240 } 241 242 if(verbose){outstream.println("Gathering reads.");} 243 ArrayList<Read> list=new ArrayList<Read>((int)readsThisPass); 244 for(int i=0; i<threads; i++){ 245 ProcessThread ct=alct.set(i, null); 246 list.addAll(ct.storage); 247 } 248 249 rvector[0]+=corrections; 250 rvector[1]+=duplicates; 251 // assert(false) : duplicates+", "+Arrays.toString(rvector); 252 assert(list.size()==readsThisPass); 253 return list; 254 } 255 256 @Override clear()257 public void clear(){ 258 super.clear(); 259 ptr.set(0); 260 map=null; 261 } 262 map()263 public LinkedHashMap<LongM, ArrayList<Clump>> map(){ 264 LinkedHashMap<LongM, ArrayList<Clump>> temp=map; 265 if(temp==null){ 266 synchronized(this){ 267 if(map==null){ 268 temp=makeMap(); 269 } 270 assert(map==null); 271 map=temp; 272 } 273 } 274 return map; 275 } 276 makeMap()277 private synchronized LinkedHashMap<LongM, ArrayList<Clump>> makeMap(){ 278 assert(this.map==null); 279 LinkedHashMap<LongM, ArrayList<Clump>> map=new LinkedHashMap<LongM, ArrayList<Clump>>(this.size()); 280 for(Clump c : this){ 281 ArrayList<Clump> list=new ArrayList<Clump>(4); 282 list.add(c); 283 map.put(new LongM(c.kmer), list); 284 } 285 fillMap(map); 286 assert(this.map==null); 287 return map; 288 } 289 fillMap(LinkedHashMap<LongM, ArrayList<Clump>> map)290 private void fillMap(LinkedHashMap<LongM, ArrayList<Clump>> map){ 291 final int shift=2*k; 292 final int shift2=shift-2; 293 final long mask=(shift>63 ? -1L : ~((-1L)<<shift)); 294 final LongM key=new LongM(); 295 296 for(Clump c : this){ 297 Read r=c.consensusRead(); 298 byte[] bases=r.bases; 299 long kmer=0, rkmer=0; 300 int len=0; 301 for(int i=0; i<bases.length; i++){ 302 byte b=bases[i]; 303 long x=AminoAcid.baseToNumber[b]; 304 long x2=AminoAcid.baseToComplementNumber[b]; 305 kmer=((kmer<<2)|x)&mask; 306 rkmer=((rkmer>>>2)|(x2<<shift2))&mask; 307 308 if(x<0){ 309 len=0; 310 kmer=rkmer=0; 311 }else{len++;} 312 313 if(len>=k){ 314 final long kmax=Tools.max(kmer, rkmer); 315 key.set(kmax); 316 if(kmax!=c.kmer){ 317 //This assertion should be fine, but it fails on nt. 318 // assert(kmer!=c.kmer && rkmer!=c.kmer) : kmax+", "+kmer+", "+rkmer+", "+c.kmer+", "+r.length()+", "+(r.length()<10000 ? r.toString() : ""); 319 } 320 ArrayList<Clump> list=map.get(key); 321 if(list!=null && !list.contains(c)){list.add(c);} 322 } 323 } 324 } 325 } 326 327 private class ProcessThread extends Thread{ 328 ProcessThread(int mode_)329 public ProcessThread(int mode_){ 330 mode=mode_; 331 } 332 333 @Override run()334 public void run(){ 335 final int size=size(); 336 for(int i=ptr.getAndIncrement(); i<size; i=ptr.getAndIncrement()){ 337 Clump c=get(i); 338 if(mode==CONDENSE){ 339 ArrayList<Read> list=c.makeConsensus(); 340 storage.addAll(list); 341 }else if(mode==CORRECT){ 342 corrections+=c.splitAndErrorCorrect(); 343 if(UNRCOMP){ 344 for(Read r : c){ 345 if(r.swapped()){ 346 ReadKey key=(ReadKey)r.obj; 347 key.flip(r, k); 348 // r.reverseComplement(); 349 // r.setSwapped(false); 350 // key.position=r.length()-key.position+k-2; 351 } 352 } 353 } 354 storage.addAll(c); 355 }else if(mode==DEDUPE){ 356 duplicates+=c.removeDuplicates(); 357 storage.addAll(c); 358 }else{ 359 throw new RuntimeException("Unknown mode "+mode); 360 } 361 c.clear(); 362 set(i, null); 363 } 364 } 365 366 public long corrections=0; 367 public long duplicates=0; 368 ArrayList<Read> storage=new ArrayList<Read>(); 369 private final int mode; 370 } 371 372 private static class ClumpThread extends Thread{ 373 ClumpThread(ArrayList<Read> input_, int startIndex_, int stopIndex_, int k, boolean makeSimpleConsensus_)374 public ClumpThread(ArrayList<Read> input_, int startIndex_, int stopIndex_, int k, boolean makeSimpleConsensus_){ 375 input=input_; 376 startIndex=startIndex_; 377 stopIndex=Tools.min(input.size(), stopIndex_); 378 // outstream.println("Processing "+startIndex_+"-"+stopIndex_); 379 storage=new ClumpList(k); 380 makeSimpleConsensusT=makeSimpleConsensus_; 381 } 382 383 @Override run()384 public void run(){ 385 if(startIndex>=input.size()){return;} 386 int i=startIndex; 387 388 long currentKmer=-1; 389 if(startIndex>0){ 390 Read r=input.get(i-1); 391 final ReadKey key=(ReadKey)r.obj; 392 currentKmer=key.kmer; 393 } 394 while(i<stopIndex){ 395 Read r=input.get(i); 396 final ReadKey key=(ReadKey)r.obj; 397 if(key.kmer!=currentKmer){ 398 if(currentClump!=null){ 399 storage.add(currentClump); 400 cAddedT++; 401 rAddedT+=currentClump.size(); 402 } 403 currentKmer=key.kmer; 404 currentClump=Clump.makeClump(key.kmer); 405 } 406 if(currentClump!=null){ 407 currentClump.add(r); 408 // assert(!r.doscarded()) : +i+", "+startIndex; 409 // r.setDoscarded(true); 410 } 411 // if(i==0){assert(r.doscarded()) : currentClump;} 412 i++; 413 } 414 while(currentClump!=null && i<input.size()){ 415 Read r=input.get(i); 416 final ReadKey key=(ReadKey)r.obj; 417 if(key.kmer!=currentKmer){ 418 storage.add(currentClump); 419 cAddedT++; 420 rAddedT+=currentClump.size(); 421 currentClump=null; 422 break; 423 }else{ 424 currentClump.add(r); 425 // assert(!r.doscarded()) : +i+", "+startIndex; 426 // r.setDoscarded(true); 427 } 428 i++; 429 } 430 if(currentClump!=null && currentClump.size()>0){ 431 cAddedT++; 432 rAddedT+=currentClump.size(); 433 storage.add(currentClump); 434 } 435 if(makeSimpleConsensusT){ 436 for(Clump c : storage){ 437 c.consensusRead(); 438 c.clearCounts(); 439 } 440 } 441 currentClump=null; 442 } 443 444 final boolean makeSimpleConsensusT; 445 final ArrayList<Read> input; 446 final int startIndex; 447 final int stopIndex; 448 ClumpList storage; 449 Clump currentClump; 450 long cAddedT=0, rAddedT=0; 451 } 452 453 static final int CONDENSE=1, CORRECT=2, DEDUPE=3; 454 455 private final boolean makeSimpleConsensus; 456 long cAdded=0, rAdded=0; 457 458 final int k; 459 final AtomicInteger ptr=new AtomicInteger(0); 460 private LinkedHashMap<LongM, ArrayList<Clump>> map; 461 462 public static boolean UNRCOMP=true; 463 private static boolean verbose=false; 464 465 private static final long serialVersionUID = 1L; 466 private static final PrintStream outstream=System.err; 467 468 } 469