1 package cardinality;
2 
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Arrays;
7 import java.util.Random;
8 import java.util.concurrent.atomic.AtomicIntegerArray;
9 
10 import dna.AminoAcid;
11 import fileIO.FileFormat;
12 import fileIO.ReadWrite;
13 import jgi.Dedupe;
14 import shared.Parse;
15 import shared.Parser;
16 import shared.Primes;
17 import shared.ReadStats;
18 import shared.Shared;
19 import shared.Timer;
20 import shared.Tools;
21 import stream.ConcurrentGenericReadInputStream;
22 import stream.ConcurrentReadInputStream;
23 import stream.FastaReadInputStream;
24 import stream.Read;
25 import structures.ListNum;
26 import ukmer.Kmer;
27 
28 /**
29  * @author Brian Bushnell
30  * @date Sep 30, 2015
31  *
32  */
33 public class LogLog_old {
34 
35 	/** Create a LogLog with default parameters */
LogLog_old()36 	public LogLog_old(){
37 		this(1999, 8, 31, -1, 0);
38 	}
39 
40 	/** Create a LogLog with parsed parameters */
LogLog_old(Parser p)41 	public LogLog_old(Parser p){
42 		this(p.loglogbuckets, p.loglogbits, p.loglogk, p.loglogseed, p.loglogMinprob);
43 	}
44 
45 	/**
46 	 * Create a LogLog with specified parameters
47 	 * @param buckets_ Number of buckets (counters)
48 	 * @param bits_ Bits hashed per cycle
49 	 * @param k_ Kmer length
50 	 * @param seed Random number generator seed; -1 for a random seed
51 	 * @param minProb_ Ignore kmers with under this probability of being correct
52 	 */
LogLog_old(int buckets_, int bits_, int k_, long seed, float minProb_)53 	public LogLog_old(int buckets_, int bits_, int k_, long seed, float minProb_){
54 //		hashes=hashes_;
55 //		if((buckets_&1)==0){buckets_=(int)Primes.primeAtLeast(buckets_);}
56 		buckets=buckets_;
57 		assert(Integer.bitCount(buckets)==1) : "Buckets must be a power of 2: "+buckets;
58 		bucketMask=buckets-1;
59 		bits=bits_;
60 		k=Kmer.getKbig(k_);
61 		minProb=minProb_;
62 		//assert(atomic);
63 		maxArrayA=(atomic ? new AtomicIntegerArray(buckets) : null);
64 		maxArray=(atomic ? null : new int[buckets]);
65 		steps=(63+bits)/bits;
66 		tables=new long[numTables][][];
67 		for(int i=0; i<numTables; i++){
68 			tables[i]=makeCodes(steps, bits, (seed<0 ? -1 : seed+i));
69 		}
70 
71 //		assert(false) : "steps="+steps+", "+tables.length+", "+tables[0].length+", "+tables[0][0].length;
72 	}
73 
main(String[] args)74 	public static void main(String[] args){
75 		LogLogWrapper llw=new LogLogWrapper(args);
76 
77 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
78 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
79 
80 		llw.process();
81 
82 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
83 	}
84 
85 //	public final long cardinality(boolean weighted){
86 //		double mult=0.7947388;
87 //		if(weighted){mult=0.7600300;}
88 //		return cardinality(mult);
89 //	}
90 
91 	public final long cardinality(){
92 		return cardinality(0.7947388);
93 	}
94 
95 	public final long cardinality(double mult){
96 		long sum=0;
97 		//assert(atomic);
98 		if(atomic){
99 			for(int i=0; i<maxArrayA.length(); i++){
100 				sum+=maxArrayA.get(i);
101 			}
102 		}else{
103 			for(int i=0; i<maxArray.length; i++){
104 				sum+=maxArray[i];
105 			}
106 		}
107 		double mean=sum/(double)buckets;
108 		long cardinality=(long)((((Math.pow(2, mean)-1)*buckets*SKIPMOD))/1.258275);
109 		lastCardinality=cardinality;
110 		return cardinality;
111 	}
112 
113 	public final long cardinalityH(){
114 		double sum=0;
115 		for(int i=0; i<maxArrayA.length(); i++){
116 			int x=Tools.max(1, maxArrayA.get(i));
117 			sum+=1.0/x;
118 		}
119 		double mean=buckets/sum;
120 		return (long)((Math.pow(2, mean)*buckets*SKIPMOD));
121 	}
122 
123 //	public long hashOld(final long value0, final long[][] table){
124 //		long value=value0, code=value0;
125 //		long mask=(bits>63 ? -1L : ~((-1L)<<bits));
126 //
127 //		for(int i=0; i<steps; i++){
128 //			int x=(int)(value&mask);
129 //			value>>=bits;
130 //			code=Long.rotateLeft(code^table[i][x], 3);
131 //		}
132 //		return Long.rotateLeft(code, (int)(value0&31));
133 //	}
134 
135 	public long hash(final long value0, final long[][] table){
136 		long value=value0, code=0;
137 		long mask=(bits>63 ? -1L : ~((-1L)<<bits));
138 
139 		for(int i=0; i<steps; i++){//I could also do while value!=0
140 			int x=(int)(value&mask);
141 			value>>=bits;
142 			code=code^table[i][x];
143 		}
144 		return code;
145 	}
146 
147 	public void add(long number){
148 		hash(number);
149 	}
150 
151 	public void hash(Read r){
152 		if(r==null){return;}
153 		if(r.length()>=k){hash(r.bases, r.quality);}
154 		if(r.mateLength()>=k){hash(r.mate.bases, r.mate.quality);}
155 	}
156 
157 	public void hash(byte[] bases, byte[] quals){
158 		if(k<32){hashSmall(bases, quals);}
159 		else{hashBig(bases, quals);}
160 	}
161 
162 	public void hashSmall(byte[] bases, byte[] quals){
163 		final int shift=2*k;
164 		final int shift2=shift-2;
165 		final long mask=(shift>63 ? -1L : ~((-1L)<<shift));
166 		int len=0;
167 
168 		long kmer=0, rkmer=0;
169 
170 		if(minProb>0 && quals!=null){//Debranched loop
171 			assert(quals.length==bases.length) : quals.length+", "+bases.length;
172 			float prob=1;
173 			for(int i=0; i<bases.length; i++){
174 				byte b=bases[i];
175 				long x=AminoAcid.baseToNumber[b];
176 				long x2=AminoAcid.baseToComplementNumber[b];
177 				kmer=((kmer<<2)|x)&mask;
178 				rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
179 
180 				{//Update probability
181 					byte q=quals[i];
182 					prob=prob*PROB_CORRECT[q];
183 					if(len>k){
184 						byte oldq=quals[i-k];
185 						prob=prob*PROB_CORRECT_INVERSE[oldq];
186 					}
187 				}
188 				if(x>=0){
189 					len++;
190 				}else{
191 					len=0;
192 					kmer=rkmer=0;
193 					prob=1;
194 				}
195 				if(len>=k && prob>=minProb){
196 					add(Tools.max(kmer, rkmer));
197 				}
198 			}
199 		}else{
200 
201 			for(int i=0; i<bases.length; i++){
202 				byte b=bases[i];
203 				long x=AminoAcid.baseToNumber[b];
204 				long x2=AminoAcid.baseToComplementNumber[b];
205 				kmer=((kmer<<2)|x)&mask;
206 				rkmer=((rkmer>>>2)|(x2<<shift2))&mask;
207 
208 				if(x>=0){
209 					len++;
210 				}else{
211 					len=0;
212 					kmer=rkmer=0;
213 				}
214 				if(len>=k){
215 					add(Tools.max(kmer, rkmer));
216 				}
217 			}
218 		}
219 	}
220 
221 	public void hashBig(byte[] bases, byte[] quals){
222 
223 		Kmer kmer=getLocalKmer();
224 		int len=0;
225 		float prob=1;
226 
227 		for(int i=0; i<bases.length; i++){
228 			byte b=bases[i];
229 			long x=Dedupe.baseToNumber[b];
230 			kmer.addRightNumeric(x);
231 			if(minProb>0 && quals!=null){//Update probability
232 				prob=prob*PROB_CORRECT[quals[i]];
233 				if(len>k){
234 					byte oldq=quals[i-k];
235 					prob=prob*PROB_CORRECT_INVERSE[oldq];
236 				}
237 			}
238 			if(AminoAcid.isFullyDefined(b)){
239 				len++;
240 			}else{
241 				len=0;
242 				prob=1;
243 			}
244 			if(len>=k && prob>=minProb){
245 				add(kmer.xor());
246 			}
247 		}
248 	}
249 
250 	public void add(LogLog_old log){
251 		if(atomic && maxArrayA!=log.maxArrayA){
252 			for(int i=0; i<buckets; i++){
253 				maxArrayA.set(maxArrayA.get(i), log.maxArrayA.get(i));
254 			}
255 		}else{
256 			for(int i=0; i<buckets; i++){
257 				maxArray[i]=Tools.max(maxArray[i], log.maxArray[i]);
258 			}
259 		}
260 	}
261 
262 	public void hash(final long number){
263 		if(number%SKIPMOD!=0){return;}
264 		long key=number;
265 
266 //		int i=(int)(number%5);
267 //		key=Long.rotateRight(key, 1);
268 //		key=hash(key, tables[i%numTables]);
269 		key=hash(key, tables[((int)number)&numTablesMask]);
270 		int leading=Long.numberOfLeadingZeros(key);
271 //		counts[leading]++;
272 
273 		if(leading<3){return;}
274 //		final int bucket=(int)((number&Integer.MAX_VALUE)%buckets);
275 		final int bucket=(int)(key&bucketMask);
276 
277 		if(atomic){
278 			int x=maxArrayA.get(bucket);
279 			while(leading>x){
280 				boolean b=maxArrayA.compareAndSet(bucket, x, leading);
281 				if(b){x=leading;}
282 				else{x=maxArrayA.get(bucket);}
283 			}
284 		}else{
285 			maxArray[bucket]=Tools.max(leading, maxArray[bucket]);
286 		}
287 	}
288 
289 	private static long[][] makeCodes(int length, int bits, long seed){
290 		Random randy=Shared.threadLocalRandom(seed);
291 		int modes=1<<bits;
292 		long[][] r=new long[length][modes];
293 		for(int i=0; i<length; i++){
294 			for(int j=0; j<modes; j++){
295 				long x=randy.nextLong();
296 				while(Long.bitCount(x)>33){
297 					x&=(~(1L<<randy.nextInt(64)));
298 				}
299 				while(Long.bitCount(x)<31){
300 					x|=(1L<<randy.nextInt(64));
301 				}
302 				r[i][j]=x;
303 
304 			}
305 		}
306 		return r;
307 	}
308 
309 	public final int k;
310 	public final int numTables=4;
311 	public final int numTablesMask=numTables-1;
312 	public final int bits;
313 	public final float minProb;
314 //	public final int hashes;
315 	public final int steps;
316 	private final long[][][] tables;
317 	public final AtomicIntegerArray maxArrayA;
318 	public final int[] maxArray;
319 	public final int buckets;
320 	public final int bucketMask;
321 	private final ThreadLocal<Kmer> localKmer=new ThreadLocal<Kmer>();
322 
getLocalKmer()323 	protected Kmer getLocalKmer(){
324 		Kmer kmer=localKmer.get();
325 		if(kmer==null){
326 			localKmer.set(new Kmer(k));
327 			kmer=localKmer.get();
328 		}
329 		kmer.clearFast();
330 		return kmer;
331 	}
332 
333 	private static class LogLogWrapper{
334 
LogLogWrapper(String[] args)335 		public LogLogWrapper(String[] args){
336 
337 			Shared.capBufferLen(200);
338 			Shared.capBuffers(8);
339 			ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
340 			ReadWrite.MAX_ZIP_THREADS=Shared.threads();
341 
342 
343 			Parser parser=new Parser();
344 			for(int i=0; i<args.length; i++){
345 				String arg=args[i];
346 				String[] split=arg.split("=");
347 				String a=split[0].toLowerCase();
348 				String b=split.length>1 ? split[1] : null;
349 
350 				if(parser.parse(arg, a, b)){
351 					//do nothing
352 				}else if(a.equals("buckets") || a.equals("loglogbuckets")){
353 					long x=Parse.parseKMG(b);
354 					buckets=(int)Primes.primeAtLeast(Tools.min(1000000, x));
355 				}else if(a.equals("bits") || a.equals("loglogbits")){
356 					bits=Integer.parseInt(b);
357 				}else if(a.equals("k") || a.equals("loglogk")){
358 					k=Integer.parseInt(b);
359 				}else if(a.equals("seed") || a.equals("loglogseed")){
360 					seed=Long.parseLong(b);
361 				}else if(a.equals("minprob") || a.equals("loglogminprob")){
362 					minProb=Float.parseFloat(b);
363 				}else if(a.equals("verbose")){
364 					verbose=Parse.parseBoolean(b);
365 				}else if(a.equals("atomic")){
366 					assert(false) : "Atomic flag disabled.";
367 //					atomic=Parse.parseBoolean(b);
368 				}else if(a.equals("parse_flag_goes_here")){
369 					//Set a variable here
370 				}else if(in1==null && i==0 && !arg.contains("=") && (arg.toLowerCase().startsWith("stdin") || new File(arg).exists())){
371 					parser.in1=b;
372 				}else{
373 					outstream.println("Unknown parameter "+args[i]);
374 					assert(false) : "Unknown parameter "+args[i];
375 					//				throw new RuntimeException("Unknown parameter "+args[i]);
376 				}
377 			}
378 
379 			{//Process parser fields
380 				Parser.processQuality();
381 
382 				maxReads=parser.maxReads;
383 
384 				overwrite=ReadStats.overwrite=parser.overwrite;
385 				append=ReadStats.append=parser.append;
386 
387 				in1=(parser.in1==null ? null : parser.in1.split(","));
388 				in2=(parser.in2==null ? null : parser.in2.split(","));
389 				out=parser.out1;
390 			}
391 
392 			assert(in1!=null && in1.length>0) : "No primary input file specified.";
393 			{
394 				ffin1=new FileFormat[in1.length];
395 				ffin2=new FileFormat[in1.length];
396 
397 				for(int i=0; i<in1.length; i++){
398 					String a=in1[i];
399 					String b=(in2!=null && in2.length>i ? in2[i] : null);
400 					assert(a!=null) : "Null input filename.";
401 					if(b==null && a.indexOf('#')>-1 && !new File(a).exists()){
402 						b=a.replace("#", "2");
403 						a=a.replace("#", "1");
404 					}
405 
406 					ffin1[i]=FileFormat.testInput(a, FileFormat.FASTQ, null, true, true);
407 					ffin2[i]=FileFormat.testInput(b, FileFormat.FASTQ, null, true, true);
408 				}
409 			}
410 
411 			assert(FastaReadInputStream.settingsOK());
412 		}
413 
414 
process()415 		void process(){
416 			Timer t=new Timer();
417 
418 			LogLog_old log=new LogLog_old(buckets, bits, k, seed, minProb);
419 
420 			for(int ffnum=0; ffnum<ffin1.length; ffnum++){
421 				ConcurrentReadInputStream cris=ConcurrentGenericReadInputStream.getReadInputStream(maxReads, false, ffin1[ffnum], ffin2[ffnum]);
422 				cris.start();
423 
424 				LogLogThread[] threads=new LogLogThread[Shared.threads()];
425 				for(int i=0; i<threads.length; i++){
426 					threads[i]=new LogLogThread((atomic ? log : new LogLog_old(buckets, bits, k, seed, minProb)), cris);
427 				}
428 				for(LogLogThread llt : threads){
429 					llt.start();
430 				}
431 				for(LogLogThread llt : threads){
432 					while(llt.getState()!=Thread.State.TERMINATED){
433 						try {
434 							llt.join();
435 						} catch (InterruptedException e) {
436 							// TODO Auto-generated catch block
437 							e.printStackTrace();
438 						}
439 					}
440 					if(!atomic){log.add(llt.log);}
441 				}
442 
443 				errorState|=ReadWrite.closeStreams(cris);
444 			}
445 
446 			final int[] max=new int[buckets];
447 			if(atomic){
448 				for(int i=0; i<log.maxArrayA.length(); i++){
449 					//				System.err.println(log.maxArray.get(i));
450 					max[i]=log.maxArrayA.get(i);
451 				}
452 			}
453 
454 			t.stop();
455 
456 
457 			long cardinality=log.cardinality();
458 
459 			if(out!=null){
460 				ReadWrite.writeString(cardinality+"\n", out);
461 			}
462 
463 //			Arrays.sort(copy);
464 //			System.err.println("Median:        "+copy[Tools.median(copy)]);
465 
466 //			System.err.println("Mean:          "+Tools.mean(copy));
467 //			System.err.println("Harmonic Mean: "+Tools.harmonicMean(copy));
468 			System.err.println("Cardinality:   "+log.cardinality());
469 //			System.err.println("CardinalityH:  "+log.cardinalityH());
470 
471 //			for(long i : log.counts){System.err.println(i);}
472 
473 			System.err.println("Time: \t"+t);
474 		}
475 
476 		private class LogLogThread extends Thread{
477 
LogLogThread(LogLog_old log_, ConcurrentReadInputStream cris_)478 			LogLogThread(LogLog_old log_, ConcurrentReadInputStream cris_){
479 				log=log_;
480 				cris=cris_;
481 			}
482 
483 			@Override
run()484 			public void run(){
485 				ListNum<Read> ln=cris.nextList();
486 				ArrayList<Read> reads=(ln!=null ? ln.list : null);
487 				while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
488 
489 					for(Read r : reads){
490 //						if(!r.validated()){r.validate(true);}
491 //						if(r.mate!=null && !r.mate.validated()){r.mate.validate(true);}
492 						log.hash(r);
493 					}
494 
495 					cris.returnList(ln);
496 					ln=cris.nextList();
497 					reads=(ln!=null ? ln.list : null);
498 				}
499 				cris.returnList(ln);
500 			}
501 
502 			private final LogLog_old log;
503 			private final ConcurrentReadInputStream cris;
504 
505 		}
506 
507 		/*--------------------------------------------------------------*/
508 		/*----------------            Fields            ----------------*/
509 		/*--------------------------------------------------------------*/
510 
511 		private int buckets=2048;//1999
512 		private int bits=8;
513 		private int k=31;
514 		private long seed=-1;
515 		private float minProb=0;
516 
517 
518 		private String[] in1=null;
519 		private String[] in2=null;
520 		private String out=null;
521 
522 		/*--------------------------------------------------------------*/
523 
524 		protected long readsProcessed=0;
525 		protected long basesProcessed=0;
526 
527 		private long maxReads=-1;
528 
529 		boolean overwrite=false;
530 		boolean append=false;
531 		boolean errorState=false;
532 
533 		/*--------------------------------------------------------------*/
534 		/*----------------         Final Fields         ----------------*/
535 		/*--------------------------------------------------------------*/
536 
537 		private final FileFormat[] ffin1;
538 		private final FileFormat[] ffin2;
539 
540 		/*--------------------------------------------------------------*/
541 		/*----------------        Common Fields         ----------------*/
542 		/*--------------------------------------------------------------*/
543 	}
544 
545 	public static final float[] PROB_CORRECT=Arrays.copyOf(align2.QualityTools.PROB_CORRECT, 128);
546 	public static final float[] PROB_CORRECT_INVERSE=Arrays.copyOf(align2.QualityTools.PROB_CORRECT_INVERSE, 128);
547 
548 	private static PrintStream outstream=System.err;
549 	public static boolean verbose=false;
550 	public static final boolean atomic=true;
551 	private static final long SKIPMOD=3;
552 	public static long lastCardinality=-1;
553 
554 }
555