1 package cardinality; 2 3 import java.io.File; 4 import java.io.PrintStream; 5 import java.util.ArrayList; 6 import java.util.Arrays; 7 import java.util.Random; 8 import java.util.concurrent.atomic.AtomicIntegerArray; 9 10 import dna.AminoAcid; 11 import fileIO.FileFormat; 12 import fileIO.ReadWrite; 13 import jgi.Dedupe; 14 import shared.Parse; 15 import shared.Parser; 16 import shared.Primes; 17 import shared.ReadStats; 18 import shared.Shared; 19 import shared.Timer; 20 import shared.Tools; 21 import stream.ConcurrentGenericReadInputStream; 22 import stream.ConcurrentReadInputStream; 23 import stream.FastaReadInputStream; 24 import stream.Read; 25 import structures.ListNum; 26 import ukmer.Kmer; 27 28 /** 29 * @author Brian Bushnell 30 * @date Sep 30, 2015 31 * 32 */ 33 public class LogLog_old { 34 35 /** Create a LogLog with default parameters */ LogLog_old()36 public LogLog_old(){ 37 this(1999, 8, 31, -1, 0); 38 } 39 40 /** Create a LogLog with parsed parameters */ LogLog_old(Parser p)41 public LogLog_old(Parser p){ 42 this(p.loglogbuckets, p.loglogbits, p.loglogk, p.loglogseed, p.loglogMinprob); 43 } 44 45 /** 46 * Create a LogLog with specified parameters 47 * @param buckets_ Number of buckets (counters) 48 * @param bits_ Bits hashed per cycle 49 * @param k_ Kmer length 50 * @param seed Random number generator seed; -1 for a random seed 51 * @param minProb_ Ignore kmers with under this probability of being correct 52 */ LogLog_old(int buckets_, int bits_, int k_, long seed, float minProb_)53 public LogLog_old(int buckets_, int bits_, int k_, long seed, float minProb_){ 54 // hashes=hashes_; 55 // if((buckets_&1)==0){buckets_=(int)Primes.primeAtLeast(buckets_);} 56 buckets=buckets_; 57 assert(Integer.bitCount(buckets)==1) : "Buckets must be a power of 2: "+buckets; 58 bucketMask=buckets-1; 59 bits=bits_; 60 k=Kmer.getKbig(k_); 61 minProb=minProb_; 62 //assert(atomic); 63 maxArrayA=(atomic ? new AtomicIntegerArray(buckets) : null); 64 maxArray=(atomic ? null : new int[buckets]); 65 steps=(63+bits)/bits; 66 tables=new long[numTables][][]; 67 for(int i=0; i<numTables; i++){ 68 tables[i]=makeCodes(steps, bits, (seed<0 ? -1 : seed+i)); 69 } 70 71 // assert(false) : "steps="+steps+", "+tables.length+", "+tables[0].length+", "+tables[0][0].length; 72 } 73 main(String[] args)74 public static void main(String[] args){ 75 LogLogWrapper llw=new LogLogWrapper(args); 76 77 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; 78 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; 79 80 llw.process(); 81 82 Read.VALIDATE_IN_CONSTRUCTOR=vic; 83 } 84 85 // public final long cardinality(boolean weighted){ 86 // double mult=0.7947388; 87 // if(weighted){mult=0.7600300;} 88 // return cardinality(mult); 89 // } 90 91 public final long cardinality(){ 92 return cardinality(0.7947388); 93 } 94 95 public final long cardinality(double mult){ 96 long sum=0; 97 //assert(atomic); 98 if(atomic){ 99 for(int i=0; i<maxArrayA.length(); i++){ 100 sum+=maxArrayA.get(i); 101 } 102 }else{ 103 for(int i=0; i<maxArray.length; i++){ 104 sum+=maxArray[i]; 105 } 106 } 107 double mean=sum/(double)buckets; 108 long cardinality=(long)((((Math.pow(2, mean)-1)*buckets*SKIPMOD))/1.258275); 109 lastCardinality=cardinality; 110 return cardinality; 111 } 112 113 public final long cardinalityH(){ 114 double sum=0; 115 for(int i=0; i<maxArrayA.length(); i++){ 116 int x=Tools.max(1, maxArrayA.get(i)); 117 sum+=1.0/x; 118 } 119 double mean=buckets/sum; 120 return (long)((Math.pow(2, mean)*buckets*SKIPMOD)); 121 } 122 123 // public long hashOld(final long value0, final long[][] table){ 124 // long value=value0, code=value0; 125 // long mask=(bits>63 ? -1L : ~((-1L)<<bits)); 126 // 127 // for(int i=0; i<steps; i++){ 128 // int x=(int)(value&mask); 129 // value>>=bits; 130 // code=Long.rotateLeft(code^table[i][x], 3); 131 // } 132 // return Long.rotateLeft(code, (int)(value0&31)); 133 // } 134 135 public long hash(final long value0, final long[][] table){ 136 long value=value0, code=0; 137 long mask=(bits>63 ? -1L : ~((-1L)<<bits)); 138 139 for(int i=0; i<steps; i++){//I could also do while value!=0 140 int x=(int)(value&mask); 141 value>>=bits; 142 code=code^table[i][x]; 143 } 144 return code; 145 } 146 147 public void add(long number){ 148 hash(number); 149 } 150 151 public void hash(Read r){ 152 if(r==null){return;} 153 if(r.length()>=k){hash(r.bases, r.quality);} 154 if(r.mateLength()>=k){hash(r.mate.bases, r.mate.quality);} 155 } 156 157 public void hash(byte[] bases, byte[] quals){ 158 if(k<32){hashSmall(bases, quals);} 159 else{hashBig(bases, quals);} 160 } 161 162 public void hashSmall(byte[] bases, byte[] quals){ 163 final int shift=2*k; 164 final int shift2=shift-2; 165 final long mask=(shift>63 ? -1L : ~((-1L)<<shift)); 166 int len=0; 167 168 long kmer=0, rkmer=0; 169 170 if(minProb>0 && quals!=null){//Debranched loop 171 assert(quals.length==bases.length) : quals.length+", "+bases.length; 172 float prob=1; 173 for(int i=0; i<bases.length; i++){ 174 byte b=bases[i]; 175 long x=AminoAcid.baseToNumber[b]; 176 long x2=AminoAcid.baseToComplementNumber[b]; 177 kmer=((kmer<<2)|x)&mask; 178 rkmer=((rkmer>>>2)|(x2<<shift2))&mask; 179 180 {//Update probability 181 byte q=quals[i]; 182 prob=prob*PROB_CORRECT[q]; 183 if(len>k){ 184 byte oldq=quals[i-k]; 185 prob=prob*PROB_CORRECT_INVERSE[oldq]; 186 } 187 } 188 if(x>=0){ 189 len++; 190 }else{ 191 len=0; 192 kmer=rkmer=0; 193 prob=1; 194 } 195 if(len>=k && prob>=minProb){ 196 add(Tools.max(kmer, rkmer)); 197 } 198 } 199 }else{ 200 201 for(int i=0; i<bases.length; i++){ 202 byte b=bases[i]; 203 long x=AminoAcid.baseToNumber[b]; 204 long x2=AminoAcid.baseToComplementNumber[b]; 205 kmer=((kmer<<2)|x)&mask; 206 rkmer=((rkmer>>>2)|(x2<<shift2))&mask; 207 208 if(x>=0){ 209 len++; 210 }else{ 211 len=0; 212 kmer=rkmer=0; 213 } 214 if(len>=k){ 215 add(Tools.max(kmer, rkmer)); 216 } 217 } 218 } 219 } 220 221 public void hashBig(byte[] bases, byte[] quals){ 222 223 Kmer kmer=getLocalKmer(); 224 int len=0; 225 float prob=1; 226 227 for(int i=0; i<bases.length; i++){ 228 byte b=bases[i]; 229 long x=Dedupe.baseToNumber[b]; 230 kmer.addRightNumeric(x); 231 if(minProb>0 && quals!=null){//Update probability 232 prob=prob*PROB_CORRECT[quals[i]]; 233 if(len>k){ 234 byte oldq=quals[i-k]; 235 prob=prob*PROB_CORRECT_INVERSE[oldq]; 236 } 237 } 238 if(AminoAcid.isFullyDefined(b)){ 239 len++; 240 }else{ 241 len=0; 242 prob=1; 243 } 244 if(len>=k && prob>=minProb){ 245 add(kmer.xor()); 246 } 247 } 248 } 249 250 public void add(LogLog_old log){ 251 if(atomic && maxArrayA!=log.maxArrayA){ 252 for(int i=0; i<buckets; i++){ 253 maxArrayA.set(maxArrayA.get(i), log.maxArrayA.get(i)); 254 } 255 }else{ 256 for(int i=0; i<buckets; i++){ 257 maxArray[i]=Tools.max(maxArray[i], log.maxArray[i]); 258 } 259 } 260 } 261 262 public void hash(final long number){ 263 if(number%SKIPMOD!=0){return;} 264 long key=number; 265 266 // int i=(int)(number%5); 267 // key=Long.rotateRight(key, 1); 268 // key=hash(key, tables[i%numTables]); 269 key=hash(key, tables[((int)number)&numTablesMask]); 270 int leading=Long.numberOfLeadingZeros(key); 271 // counts[leading]++; 272 273 if(leading<3){return;} 274 // final int bucket=(int)((number&Integer.MAX_VALUE)%buckets); 275 final int bucket=(int)(key&bucketMask); 276 277 if(atomic){ 278 int x=maxArrayA.get(bucket); 279 while(leading>x){ 280 boolean b=maxArrayA.compareAndSet(bucket, x, leading); 281 if(b){x=leading;} 282 else{x=maxArrayA.get(bucket);} 283 } 284 }else{ 285 maxArray[bucket]=Tools.max(leading, maxArray[bucket]); 286 } 287 } 288 289 private static long[][] makeCodes(int length, int bits, long seed){ 290 Random randy=Shared.threadLocalRandom(seed); 291 int modes=1<<bits; 292 long[][] r=new long[length][modes]; 293 for(int i=0; i<length; i++){ 294 for(int j=0; j<modes; j++){ 295 long x=randy.nextLong(); 296 while(Long.bitCount(x)>33){ 297 x&=(~(1L<<randy.nextInt(64))); 298 } 299 while(Long.bitCount(x)<31){ 300 x|=(1L<<randy.nextInt(64)); 301 } 302 r[i][j]=x; 303 304 } 305 } 306 return r; 307 } 308 309 public final int k; 310 public final int numTables=4; 311 public final int numTablesMask=numTables-1; 312 public final int bits; 313 public final float minProb; 314 // public final int hashes; 315 public final int steps; 316 private final long[][][] tables; 317 public final AtomicIntegerArray maxArrayA; 318 public final int[] maxArray; 319 public final int buckets; 320 public final int bucketMask; 321 private final ThreadLocal<Kmer> localKmer=new ThreadLocal<Kmer>(); 322 getLocalKmer()323 protected Kmer getLocalKmer(){ 324 Kmer kmer=localKmer.get(); 325 if(kmer==null){ 326 localKmer.set(new Kmer(k)); 327 kmer=localKmer.get(); 328 } 329 kmer.clearFast(); 330 return kmer; 331 } 332 333 private static class LogLogWrapper{ 334 LogLogWrapper(String[] args)335 public LogLogWrapper(String[] args){ 336 337 Shared.capBufferLen(200); 338 Shared.capBuffers(8); 339 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 340 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 341 342 343 Parser parser=new Parser(); 344 for(int i=0; i<args.length; i++){ 345 String arg=args[i]; 346 String[] split=arg.split("="); 347 String a=split[0].toLowerCase(); 348 String b=split.length>1 ? split[1] : null; 349 350 if(parser.parse(arg, a, b)){ 351 //do nothing 352 }else if(a.equals("buckets") || a.equals("loglogbuckets")){ 353 long x=Parse.parseKMG(b); 354 buckets=(int)Primes.primeAtLeast(Tools.min(1000000, x)); 355 }else if(a.equals("bits") || a.equals("loglogbits")){ 356 bits=Integer.parseInt(b); 357 }else if(a.equals("k") || a.equals("loglogk")){ 358 k=Integer.parseInt(b); 359 }else if(a.equals("seed") || a.equals("loglogseed")){ 360 seed=Long.parseLong(b); 361 }else if(a.equals("minprob") || a.equals("loglogminprob")){ 362 minProb=Float.parseFloat(b); 363 }else if(a.equals("verbose")){ 364 verbose=Parse.parseBoolean(b); 365 }else if(a.equals("atomic")){ 366 assert(false) : "Atomic flag disabled."; 367 // atomic=Parse.parseBoolean(b); 368 }else if(a.equals("parse_flag_goes_here")){ 369 //Set a variable here 370 }else if(in1==null && i==0 && !arg.contains("=") && (arg.toLowerCase().startsWith("stdin") || new File(arg).exists())){ 371 parser.in1=b; 372 }else{ 373 outstream.println("Unknown parameter "+args[i]); 374 assert(false) : "Unknown parameter "+args[i]; 375 // throw new RuntimeException("Unknown parameter "+args[i]); 376 } 377 } 378 379 {//Process parser fields 380 Parser.processQuality(); 381 382 maxReads=parser.maxReads; 383 384 overwrite=ReadStats.overwrite=parser.overwrite; 385 append=ReadStats.append=parser.append; 386 387 in1=(parser.in1==null ? null : parser.in1.split(",")); 388 in2=(parser.in2==null ? null : parser.in2.split(",")); 389 out=parser.out1; 390 } 391 392 assert(in1!=null && in1.length>0) : "No primary input file specified."; 393 { 394 ffin1=new FileFormat[in1.length]; 395 ffin2=new FileFormat[in1.length]; 396 397 for(int i=0; i<in1.length; i++){ 398 String a=in1[i]; 399 String b=(in2!=null && in2.length>i ? in2[i] : null); 400 assert(a!=null) : "Null input filename."; 401 if(b==null && a.indexOf('#')>-1 && !new File(a).exists()){ 402 b=a.replace("#", "2"); 403 a=a.replace("#", "1"); 404 } 405 406 ffin1[i]=FileFormat.testInput(a, FileFormat.FASTQ, null, true, true); 407 ffin2[i]=FileFormat.testInput(b, FileFormat.FASTQ, null, true, true); 408 } 409 } 410 411 assert(FastaReadInputStream.settingsOK()); 412 } 413 414 process()415 void process(){ 416 Timer t=new Timer(); 417 418 LogLog_old log=new LogLog_old(buckets, bits, k, seed, minProb); 419 420 for(int ffnum=0; ffnum<ffin1.length; ffnum++){ 421 ConcurrentReadInputStream cris=ConcurrentGenericReadInputStream.getReadInputStream(maxReads, false, ffin1[ffnum], ffin2[ffnum]); 422 cris.start(); 423 424 LogLogThread[] threads=new LogLogThread[Shared.threads()]; 425 for(int i=0; i<threads.length; i++){ 426 threads[i]=new LogLogThread((atomic ? log : new LogLog_old(buckets, bits, k, seed, minProb)), cris); 427 } 428 for(LogLogThread llt : threads){ 429 llt.start(); 430 } 431 for(LogLogThread llt : threads){ 432 while(llt.getState()!=Thread.State.TERMINATED){ 433 try { 434 llt.join(); 435 } catch (InterruptedException e) { 436 // TODO Auto-generated catch block 437 e.printStackTrace(); 438 } 439 } 440 if(!atomic){log.add(llt.log);} 441 } 442 443 errorState|=ReadWrite.closeStreams(cris); 444 } 445 446 final int[] max=new int[buckets]; 447 if(atomic){ 448 for(int i=0; i<log.maxArrayA.length(); i++){ 449 // System.err.println(log.maxArray.get(i)); 450 max[i]=log.maxArrayA.get(i); 451 } 452 } 453 454 t.stop(); 455 456 457 long cardinality=log.cardinality(); 458 459 if(out!=null){ 460 ReadWrite.writeString(cardinality+"\n", out); 461 } 462 463 // Arrays.sort(copy); 464 // System.err.println("Median: "+copy[Tools.median(copy)]); 465 466 // System.err.println("Mean: "+Tools.mean(copy)); 467 // System.err.println("Harmonic Mean: "+Tools.harmonicMean(copy)); 468 System.err.println("Cardinality: "+log.cardinality()); 469 // System.err.println("CardinalityH: "+log.cardinalityH()); 470 471 // for(long i : log.counts){System.err.println(i);} 472 473 System.err.println("Time: \t"+t); 474 } 475 476 private class LogLogThread extends Thread{ 477 LogLogThread(LogLog_old log_, ConcurrentReadInputStream cris_)478 LogLogThread(LogLog_old log_, ConcurrentReadInputStream cris_){ 479 log=log_; 480 cris=cris_; 481 } 482 483 @Override run()484 public void run(){ 485 ListNum<Read> ln=cris.nextList(); 486 ArrayList<Read> reads=(ln!=null ? ln.list : null); 487 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 488 489 for(Read r : reads){ 490 // if(!r.validated()){r.validate(true);} 491 // if(r.mate!=null && !r.mate.validated()){r.mate.validate(true);} 492 log.hash(r); 493 } 494 495 cris.returnList(ln); 496 ln=cris.nextList(); 497 reads=(ln!=null ? ln.list : null); 498 } 499 cris.returnList(ln); 500 } 501 502 private final LogLog_old log; 503 private final ConcurrentReadInputStream cris; 504 505 } 506 507 /*--------------------------------------------------------------*/ 508 /*---------------- Fields ----------------*/ 509 /*--------------------------------------------------------------*/ 510 511 private int buckets=2048;//1999 512 private int bits=8; 513 private int k=31; 514 private long seed=-1; 515 private float minProb=0; 516 517 518 private String[] in1=null; 519 private String[] in2=null; 520 private String out=null; 521 522 /*--------------------------------------------------------------*/ 523 524 protected long readsProcessed=0; 525 protected long basesProcessed=0; 526 527 private long maxReads=-1; 528 529 boolean overwrite=false; 530 boolean append=false; 531 boolean errorState=false; 532 533 /*--------------------------------------------------------------*/ 534 /*---------------- Final Fields ----------------*/ 535 /*--------------------------------------------------------------*/ 536 537 private final FileFormat[] ffin1; 538 private final FileFormat[] ffin2; 539 540 /*--------------------------------------------------------------*/ 541 /*---------------- Common Fields ----------------*/ 542 /*--------------------------------------------------------------*/ 543 } 544 545 public static final float[] PROB_CORRECT=Arrays.copyOf(align2.QualityTools.PROB_CORRECT, 128); 546 public static final float[] PROB_CORRECT_INVERSE=Arrays.copyOf(align2.QualityTools.PROB_CORRECT_INVERSE, 128); 547 548 private static PrintStream outstream=System.err; 549 public static boolean verbose=false; 550 public static final boolean atomic=true; 551 private static final long SKIPMOD=3; 552 public static long lastCardinality=-1; 553 554 } 555