1 package cardinality;
2 
3 import java.io.File;
4 import java.io.PrintStream;
5 import java.util.ArrayList;
6 import java.util.Random;
7 
8 import dna.AminoAcid;
9 import fileIO.FileFormat;
10 import fileIO.ReadWrite;
11 import shared.Parse;
12 import shared.Parser;
13 import shared.ReadStats;
14 import shared.Shared;
15 import shared.Timer;
16 import stream.ConcurrentGenericReadInputStream;
17 import stream.ConcurrentReadInputStream;
18 import stream.FastaReadInputStream;
19 import stream.Read;
20 import structures.ListNum;
21 import structures.LongList;
22 
23 class LogLogWrapper {
24 
main(String[] args)25 	public static final void main(String[] args){
26 		LogLogWrapper llw=new LogLogWrapper(args);
27 
28 		final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR;
29 		Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4;
30 
31 		Timer t=new Timer();
32 		LongList results=new LongList();
33 		for(int i=0; i<llw.trials; i++){
34 			long card=llw.process();
35 			results.add(card);
36 		}
37 
38 		if(llw.trials>1){
39 			results.sort();
40 
41 			long kmers=llw.kmersProcessed;
42 			double mean=results.mean();
43 			double hmean=results.harmonicMean();
44 			double gmean=results.geometricMean();
45 			final double div=hmean;
46 //			long expected=(150-31+1)*llw.maxReads;
47 //			if(!llw.synth){expected=(long)mean;}
48 
49 			long median=results.median();
50 
51 
52 
53 			long min=results.min();
54 			long max=results.max();
55 			long p05=results.percentile(0.05);
56 			long p95=results.percentile(0.95);
57 			double stdev=results.stdev();
58 			double avgDif=results.avgDif(mean);
59 //			double rmsDif=results.rmsDif(mean);
60 
61 			double range=(p95-p05)/div;
62 
63 //			System.err.println(stdev);
64 //			System.err.println(rmsDif);
65 
66 			stdev=stdev/div;
67 			avgDif=avgDif/div;
68 //			rmsDif=rmsDif/mean;
69 
70 			/* Testing indicates that more buckets and fewer trials is more accurate. */
71 			/* For example, 8 buckets 256 trials < 32 buckets 64 trials < 256 buckets 8 trials, */
72 			/* from the standpoint of minimizing the standard deviation of the hmean of the estimates over 45 reps. */
73 
74 			t.stopAndPrint();
75 			System.err.println("#Trials\tkmers\thmean\tmedian\tmin\tmax\t5%ile\t95%ile\trange\tstdev\tavgDif");
76 			System.err.println(llw.trials+"\t"+kmers+"\t"+String.format("%.2f", hmean)+"\t"+median+"\t"+min+"\t"+max
77 					+"\t"+p05+"\t"+p95+"\t"+String.format("%.5f", range)+"\t"+String.format("%.5f", stdev)+"\t"+String.format("%.5f", avgDif));
78 
79 			if(CardinalityTracker.trackCounts){
80 				System.err.println("Avg Count:\t"+String.format("%.4f", llw.countSum/(llw.trials*(double)llw.buckets)));
81 			}
82 //			System.err.println("#Trials\tkmers\tmean\thmean\tgmean\tmedian\tmin\tmax\t5%ile\t95%ile\trange\tstdev\tavgDif");
83 //			System.err.println(llw.trials+"\t"+kmers+"\t"+String.format("%.2f", mean)+"\t"+String.format("%.2f", hmean)+"\t"+String.format("%.2f", gmean)+"\t"+median+"\t"+min+"\t"+max
84 //					+"\t"+p05+"\t"+p95+"\t"+String.format("%.5f", range)+"\t"+String.format("%.5f", stdev)+"\t"+String.format("%.5f", avgDif));
85 
86 		}
87 
88 		Read.VALIDATE_IN_CONSTRUCTOR=vic;
89 	}
90 
91 	public LogLogWrapper(String[] args){
92 
93 		Shared.capBufferLen(200);
94 		Shared.capBuffers(8);
95 		ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true;
96 		ReadWrite.MAX_ZIP_THREADS=Shared.threads();
97 
98 		Parser parser=new Parser();
99 		for(int i=0; i<args.length; i++){
100 			String arg=args[i];
101 			String[] split=arg.split("=");
102 			String a=split[0].toLowerCase();
103 			String b=split.length>1 ? split[1] : null;
104 
105 			if(a.equals("buckets") || a.equals("loglogbuckets")){
106 				buckets=Parse.parseIntKMG(b);
107 			}else if(a.equals("k") || a.equals("loglogk")){
108 				k=Integer.parseInt(b);
109 			}else if(a.equals("seed") || a.equals("loglogseed")){
110 				seed=Long.parseLong(b);
111 			}else if(a.equals("seed2")){
112 				seed2=Long.parseLong(b);
113 			}else if(a.equals("minprob") || a.equals("loglogminprob")){
114 				minProb=Float.parseFloat(b);
115 			}else if(a.equals("synth")){
116 				synth=Parse.parseBoolean(b);
117 			}else if(a.equals("trials")){
118 				trials=Parse.parseIntKMG(b);
119 			}else if(a.equals("verbose")){
120 				verbose=Parse.parseBoolean(b);
121 			}else if(a.equals("loglogcounts") || a.equals("loglogcount") ||
122 					a.equals("count") || a.equals("counts") || a.equals("trackcounts")){
123 				CardinalityTracker.trackCounts=Parse.parseBoolean(b);
124 			}else if(a.equals("atomic")){
125 				assert(false) : "Atomic flag disabled.";
126 //				CardinalityTracker.atomic=Parse.parseBoolean(b);
127 			}else if(parser.parse(arg, a, b)){
128 				//do nothing
129 			}else if(a.equals("parse_flag_goes_here")){
130 				//Set a variable here
131 			}else if(in1==null && i==0 && !arg.contains("=") && (arg.toLowerCase().startsWith("stdin") || new File(arg).exists())){
132 				parser.in1=b;
133 			}
134 
135 			else{
136 				outstream.println("Unknown parameter "+args[i]);
137 				assert(false) : "Unknown parameter "+args[i];
138 				//				throw new RuntimeException("Unknown parameter "+args[i]);
139 			}
140 		}
141 
142 		{//Process parser fields
143 			Parser.processQuality();
144 
145 			maxReads=parser.maxReads;
146 
147 			overwrite=ReadStats.overwrite=parser.overwrite;
148 			append=ReadStats.append=parser.append;
149 
150 			in1=(parser.in1==null ? null : parser.in1.split(","));
151 			in2=(parser.in2==null ? null : parser.in2.split(","));
152 			out=parser.out1;
153 		}
154 
155 		assert(synth || (in1!=null && in1.length>0)) : "No primary input file specified.";
156 		if(synth){
157 			ffin1=ffin2=null;
158 		}else{
159 			ffin1=new FileFormat[in1.length];
160 			ffin2=new FileFormat[in1.length];
161 
162 			for(int i=0; i<in1.length; i++){
163 				String a=in1[i];
164 				String b=(in2!=null && in2.length>i ? in2[i] : null);
165 				assert(a!=null) : "Null input filename.";
166 				if(b==null && a.indexOf('#')>-1 && !new File(a).exists()){
167 					b=a.replace("#", "2");
168 					a=a.replace("#", "1");
169 				}
170 
171 				ffin1[i]=FileFormat.testInput(a, FileFormat.FASTQ, null, true, true);
172 				ffin2[i]=FileFormat.testInput(b, FileFormat.FASTQ, null, true, true);
173 			}
174 		}
175 
176 		threads=Shared.threads();
177 		assert(FastaReadInputStream.settingsOK());
178 	}
179 
180 
process()181 	long process(){
182 		Timer t=new Timer();
183 		readsProcessed=basesProcessed=kmersProcessed=0;
184 
185 		CardinalityTracker log=CardinalityTracker.makeTracker(buckets, k, seed, minProb);
186 
187 		for(int ffnum=0, max=(synth ? 1 : ffin1.length); ffnum<max; ffnum++){
188 			ConcurrentReadInputStream cris=null;
189 			if(!synth){
190 				cris=ConcurrentGenericReadInputStream.getReadInputStream(maxReads, false, ffin1[ffnum], ffin2[ffnum]);
191 				cris.start();
192 			}
193 
194 			LogLogThread[] threadArray=new LogLogThread[threads];
195 			for(int tid=0; tid<threadArray.length; tid++){
196 				threadArray[tid]=new LogLogThread((CardinalityTracker.atomic ? log : CardinalityTracker.makeTracker(buckets, k, seed, minProb)), cris, tid);
197 			}
198 			for(LogLogThread llt : threadArray){
199 				llt.start();
200 			}
201 			for(LogLogThread llt : threadArray){
202 				while(llt.getState()!=Thread.State.TERMINATED){
203 					try {
204 						llt.join();
205 					} catch (InterruptedException e) {
206 						// TODO Auto-generated catch block
207 						e.printStackTrace();
208 					}
209 				}
210 				readsProcessed+=llt.readsProcessedT;
211 				basesProcessed+=llt.basesProcessedT;
212 				kmersProcessed+=llt.kmersProcessedT;
213 				if(!CardinalityTracker.atomic){log.add(llt.log);}
214 			}
215 
216 			if(cris!=null){errorState|=ReadWrite.closeStreams(cris);}
217 		}
218 
219 //		final int[] max=new int[buckets];
220 //		if(CardinalityTracker.atomic){
221 //			for(int i=0; i<log.maxArray.length(); i++){
222 //				//				System.err.println(log.maxArray.get(i));
223 //				max[i]=log.maxArray.get(i);
224 //			}
225 //		}
226 
227 		t.stop();
228 
229 
230 		long cardinality=log.cardinality();
231 		countSum+=(CardinalityTracker.trackCounts ? log.countSum() : 0);
232 
233 		if(out!=null){
234 			ReadWrite.writeString(cardinality+"\n", out);
235 		}
236 
237 		if(!Parser.silent) {
238 //		Arrays.sort(copy);
239 //		System.err.println("Median:        "+copy[Tools.median(copy)]);
240 
241 //		System.err.println("Mean:          "+Tools.mean(copy));
242 //		System.err.println("Harmonic Mean: "+Tools.harmonicMean(copy));
243 		System.err.println("Cardinality:   "+cardinality);
244 //		System.err.println("CardinalityH:  "+log.cardinalityH());
245 
246 //		for(long i : log.counts){System.err.println(i);}
247 
248 		System.err.println("Time: \t"+t);
249 		}
250 
251 		return cardinality;
252 	}
253 
makeRead(int len, Random randy, Read r)254 	private static Read makeRead(int len, Random randy, Read r){
255 		if(r==null || r.bases==null || r.bases.length!=len){
256 			r=new Read(null, null, 0);
257 			r.bases=new byte[len];
258 		}
259 		byte[] bases=r.bases;
260 
261 		int pos=0;
262 		final int basesPerRand=4;//Fewer calls to rand should be faster
263 		for(int max=bases.length-(bases.length%basesPerRand); pos<max;){
264 			int x=randy.nextInt()%prime;
265 			for(int i=0; i<basesPerRand; i++){
266 				int num=x&3;
267 				byte b=AminoAcid.numberToBase[num];
268 				bases[pos]=b;
269 				pos++;
270 				x>>=2;
271 			}
272 		}
273 		for(; pos<bases.length; pos++){
274 			int x=randy.nextInt()%prime;
275 			int num=x&3;
276 			byte b=AminoAcid.numberToBase[num];
277 			bases[pos]=b;
278 		}
279 		return r;
280 	}
281 
282 	/*--------------------------------------------------------------*/
283 	/*----------------        Inner Classes         ----------------*/
284 	/*--------------------------------------------------------------*/
285 
286 	private class LogLogThread extends Thread{
287 
LogLogThread(CardinalityTracker log_, ConcurrentReadInputStream cris_, int tid_)288 		LogLogThread(CardinalityTracker log_, ConcurrentReadInputStream cris_, int tid_){
289 			log=log_;
290 			cris=cris_;
291 			tid=tid_;
292 		}
293 
294 		@Override
run()295 		public void run(){
296 			if(cris!=null){runCris();}
297 			else{runSynth();}
298 		}
299 
runCris()300 		public void runCris(){
301 			final int kt=k;
302 			ListNum<Read> ln=cris.nextList();
303 			ArrayList<Read> reads=(ln!=null ? ln.list : null);
304 			while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning
305 
306 				for(Read r : reads){
307 //					if(!r.validated()){r.validate(true);}
308 //					if(r.mate!=null && !r.mate.validated()){r.mate.validate(true);}
309 					log.hash(r);
310 					readsProcessedT+=r.pairCount();
311 					basesProcessedT+=r.pairLength();
312 					kmersProcessedT+=r.numPairKmers(kt);
313 				}
314 
315 				cris.returnList(ln);
316 				ln=cris.nextList();
317 				reads=(ln!=null ? ln.list : null);
318 			}
319 			cris.returnList(ln);
320 		}
321 
runSynth()322 		public void runSynth(){
323 			final int kt=k;
324 			assert(maxReads>0 && maxReads<Long.MAX_VALUE);
325 			long readsLeft=maxReads/threads;
326 			readsLeft+=(maxReads%threads>tid ? 1 : 0);
327 			Random randy=Shared.threadLocalRandom(seed2<0 ? seed2 : seed2+999999L);
328 
329 			Read r=null;
330 			while(readsLeft>0){
331 				r=makeRead(150, randy, r);
332 				log.hash(r);
333 				readsProcessedT+=r.pairCount();
334 				basesProcessedT+=r.pairLength();
335 				kmersProcessedT+=r.numPairKmers(kt);
336 				readsLeft--;
337 			}
338 		}
339 
340 		private final CardinalityTracker log;
341 		private final ConcurrentReadInputStream cris;
342 		private final int tid;
343 
344 		protected long readsProcessedT=0;
345 		protected long basesProcessedT=0;
346 		protected long kmersProcessedT=0;
347 
348 	}
349 
350 	/*--------------------------------------------------------------*/
351 	/*----------------            Fields            ----------------*/
352 	/*--------------------------------------------------------------*/
353 
354 	private int buckets=2048;
355 	private int k=31;
356 	private long seed=-1;
357 	private long seed2=-1;
358 	private float minProb=0;
359 	private long countSum=0;
360 
361 	private String[] in1=null;
362 	private String[] in2=null;
363 	private String out=null;
364 
365 	/*--------------------------------------------------------------*/
366 
367 	protected long readsProcessed=0;
368 	protected long basesProcessed=0;
369 	protected long kmersProcessed=0;
370 
371 	private long maxReads=-1;
372 
373 	boolean overwrite=false;
374 	boolean append=false;
375 	boolean errorState=false;
376 
377 	private int trials=1;
378 	private boolean synth=false;
379 
380 	/*--------------------------------------------------------------*/
381 	/*----------------         Final Fields         ----------------*/
382 	/*--------------------------------------------------------------*/
383 
384 	private final FileFormat[] ffin1;
385 	private final FileFormat[] ffin2;
386 
387 	final int threads;
388 	private static final int prime=32452843; //A prime number
389 
390 	/*--------------------------------------------------------------*/
391 	/*----------------        Common Fields         ----------------*/
392 	/*--------------------------------------------------------------*/
393 
394 	private static PrintStream outstream=System.err;
395 	public static boolean verbose=false;
396 }
397