1 package cardinality; 2 3 import java.io.File; 4 import java.io.PrintStream; 5 import java.util.ArrayList; 6 import java.util.Random; 7 8 import dna.AminoAcid; 9 import fileIO.FileFormat; 10 import fileIO.ReadWrite; 11 import shared.Parse; 12 import shared.Parser; 13 import shared.ReadStats; 14 import shared.Shared; 15 import shared.Timer; 16 import stream.ConcurrentGenericReadInputStream; 17 import stream.ConcurrentReadInputStream; 18 import stream.FastaReadInputStream; 19 import stream.Read; 20 import structures.ListNum; 21 import structures.LongList; 22 23 class LogLogWrapper { 24 main(String[] args)25 public static final void main(String[] args){ 26 LogLogWrapper llw=new LogLogWrapper(args); 27 28 final boolean vic=Read.VALIDATE_IN_CONSTRUCTOR; 29 Read.VALIDATE_IN_CONSTRUCTOR=Shared.threads()<4; 30 31 Timer t=new Timer(); 32 LongList results=new LongList(); 33 for(int i=0; i<llw.trials; i++){ 34 long card=llw.process(); 35 results.add(card); 36 } 37 38 if(llw.trials>1){ 39 results.sort(); 40 41 long kmers=llw.kmersProcessed; 42 double mean=results.mean(); 43 double hmean=results.harmonicMean(); 44 double gmean=results.geometricMean(); 45 final double div=hmean; 46 // long expected=(150-31+1)*llw.maxReads; 47 // if(!llw.synth){expected=(long)mean;} 48 49 long median=results.median(); 50 51 52 53 long min=results.min(); 54 long max=results.max(); 55 long p05=results.percentile(0.05); 56 long p95=results.percentile(0.95); 57 double stdev=results.stdev(); 58 double avgDif=results.avgDif(mean); 59 // double rmsDif=results.rmsDif(mean); 60 61 double range=(p95-p05)/div; 62 63 // System.err.println(stdev); 64 // System.err.println(rmsDif); 65 66 stdev=stdev/div; 67 avgDif=avgDif/div; 68 // rmsDif=rmsDif/mean; 69 70 /* Testing indicates that more buckets and fewer trials is more accurate. */ 71 /* For example, 8 buckets 256 trials < 32 buckets 64 trials < 256 buckets 8 trials, */ 72 /* from the standpoint of minimizing the standard deviation of the hmean of the estimates over 45 reps. */ 73 74 t.stopAndPrint(); 75 System.err.println("#Trials\tkmers\thmean\tmedian\tmin\tmax\t5%ile\t95%ile\trange\tstdev\tavgDif"); 76 System.err.println(llw.trials+"\t"+kmers+"\t"+String.format("%.2f", hmean)+"\t"+median+"\t"+min+"\t"+max 77 +"\t"+p05+"\t"+p95+"\t"+String.format("%.5f", range)+"\t"+String.format("%.5f", stdev)+"\t"+String.format("%.5f", avgDif)); 78 79 if(CardinalityTracker.trackCounts){ 80 System.err.println("Avg Count:\t"+String.format("%.4f", llw.countSum/(llw.trials*(double)llw.buckets))); 81 } 82 // System.err.println("#Trials\tkmers\tmean\thmean\tgmean\tmedian\tmin\tmax\t5%ile\t95%ile\trange\tstdev\tavgDif"); 83 // System.err.println(llw.trials+"\t"+kmers+"\t"+String.format("%.2f", mean)+"\t"+String.format("%.2f", hmean)+"\t"+String.format("%.2f", gmean)+"\t"+median+"\t"+min+"\t"+max 84 // +"\t"+p05+"\t"+p95+"\t"+String.format("%.5f", range)+"\t"+String.format("%.5f", stdev)+"\t"+String.format("%.5f", avgDif)); 85 86 } 87 88 Read.VALIDATE_IN_CONSTRUCTOR=vic; 89 } 90 91 public LogLogWrapper(String[] args){ 92 93 Shared.capBufferLen(200); 94 Shared.capBuffers(8); 95 ReadWrite.USE_PIGZ=ReadWrite.USE_UNPIGZ=true; 96 ReadWrite.MAX_ZIP_THREADS=Shared.threads(); 97 98 Parser parser=new Parser(); 99 for(int i=0; i<args.length; i++){ 100 String arg=args[i]; 101 String[] split=arg.split("="); 102 String a=split[0].toLowerCase(); 103 String b=split.length>1 ? split[1] : null; 104 105 if(a.equals("buckets") || a.equals("loglogbuckets")){ 106 buckets=Parse.parseIntKMG(b); 107 }else if(a.equals("k") || a.equals("loglogk")){ 108 k=Integer.parseInt(b); 109 }else if(a.equals("seed") || a.equals("loglogseed")){ 110 seed=Long.parseLong(b); 111 }else if(a.equals("seed2")){ 112 seed2=Long.parseLong(b); 113 }else if(a.equals("minprob") || a.equals("loglogminprob")){ 114 minProb=Float.parseFloat(b); 115 }else if(a.equals("synth")){ 116 synth=Parse.parseBoolean(b); 117 }else if(a.equals("trials")){ 118 trials=Parse.parseIntKMG(b); 119 }else if(a.equals("verbose")){ 120 verbose=Parse.parseBoolean(b); 121 }else if(a.equals("loglogcounts") || a.equals("loglogcount") || 122 a.equals("count") || a.equals("counts") || a.equals("trackcounts")){ 123 CardinalityTracker.trackCounts=Parse.parseBoolean(b); 124 }else if(a.equals("atomic")){ 125 assert(false) : "Atomic flag disabled."; 126 // CardinalityTracker.atomic=Parse.parseBoolean(b); 127 }else if(parser.parse(arg, a, b)){ 128 //do nothing 129 }else if(a.equals("parse_flag_goes_here")){ 130 //Set a variable here 131 }else if(in1==null && i==0 && !arg.contains("=") && (arg.toLowerCase().startsWith("stdin") || new File(arg).exists())){ 132 parser.in1=b; 133 } 134 135 else{ 136 outstream.println("Unknown parameter "+args[i]); 137 assert(false) : "Unknown parameter "+args[i]; 138 // throw new RuntimeException("Unknown parameter "+args[i]); 139 } 140 } 141 142 {//Process parser fields 143 Parser.processQuality(); 144 145 maxReads=parser.maxReads; 146 147 overwrite=ReadStats.overwrite=parser.overwrite; 148 append=ReadStats.append=parser.append; 149 150 in1=(parser.in1==null ? null : parser.in1.split(",")); 151 in2=(parser.in2==null ? null : parser.in2.split(",")); 152 out=parser.out1; 153 } 154 155 assert(synth || (in1!=null && in1.length>0)) : "No primary input file specified."; 156 if(synth){ 157 ffin1=ffin2=null; 158 }else{ 159 ffin1=new FileFormat[in1.length]; 160 ffin2=new FileFormat[in1.length]; 161 162 for(int i=0; i<in1.length; i++){ 163 String a=in1[i]; 164 String b=(in2!=null && in2.length>i ? in2[i] : null); 165 assert(a!=null) : "Null input filename."; 166 if(b==null && a.indexOf('#')>-1 && !new File(a).exists()){ 167 b=a.replace("#", "2"); 168 a=a.replace("#", "1"); 169 } 170 171 ffin1[i]=FileFormat.testInput(a, FileFormat.FASTQ, null, true, true); 172 ffin2[i]=FileFormat.testInput(b, FileFormat.FASTQ, null, true, true); 173 } 174 } 175 176 threads=Shared.threads(); 177 assert(FastaReadInputStream.settingsOK()); 178 } 179 180 process()181 long process(){ 182 Timer t=new Timer(); 183 readsProcessed=basesProcessed=kmersProcessed=0; 184 185 CardinalityTracker log=CardinalityTracker.makeTracker(buckets, k, seed, minProb); 186 187 for(int ffnum=0, max=(synth ? 1 : ffin1.length); ffnum<max; ffnum++){ 188 ConcurrentReadInputStream cris=null; 189 if(!synth){ 190 cris=ConcurrentGenericReadInputStream.getReadInputStream(maxReads, false, ffin1[ffnum], ffin2[ffnum]); 191 cris.start(); 192 } 193 194 LogLogThread[] threadArray=new LogLogThread[threads]; 195 for(int tid=0; tid<threadArray.length; tid++){ 196 threadArray[tid]=new LogLogThread((CardinalityTracker.atomic ? log : CardinalityTracker.makeTracker(buckets, k, seed, minProb)), cris, tid); 197 } 198 for(LogLogThread llt : threadArray){ 199 llt.start(); 200 } 201 for(LogLogThread llt : threadArray){ 202 while(llt.getState()!=Thread.State.TERMINATED){ 203 try { 204 llt.join(); 205 } catch (InterruptedException e) { 206 // TODO Auto-generated catch block 207 e.printStackTrace(); 208 } 209 } 210 readsProcessed+=llt.readsProcessedT; 211 basesProcessed+=llt.basesProcessedT; 212 kmersProcessed+=llt.kmersProcessedT; 213 if(!CardinalityTracker.atomic){log.add(llt.log);} 214 } 215 216 if(cris!=null){errorState|=ReadWrite.closeStreams(cris);} 217 } 218 219 // final int[] max=new int[buckets]; 220 // if(CardinalityTracker.atomic){ 221 // for(int i=0; i<log.maxArray.length(); i++){ 222 // // System.err.println(log.maxArray.get(i)); 223 // max[i]=log.maxArray.get(i); 224 // } 225 // } 226 227 t.stop(); 228 229 230 long cardinality=log.cardinality(); 231 countSum+=(CardinalityTracker.trackCounts ? log.countSum() : 0); 232 233 if(out!=null){ 234 ReadWrite.writeString(cardinality+"\n", out); 235 } 236 237 if(!Parser.silent) { 238 // Arrays.sort(copy); 239 // System.err.println("Median: "+copy[Tools.median(copy)]); 240 241 // System.err.println("Mean: "+Tools.mean(copy)); 242 // System.err.println("Harmonic Mean: "+Tools.harmonicMean(copy)); 243 System.err.println("Cardinality: "+cardinality); 244 // System.err.println("CardinalityH: "+log.cardinalityH()); 245 246 // for(long i : log.counts){System.err.println(i);} 247 248 System.err.println("Time: \t"+t); 249 } 250 251 return cardinality; 252 } 253 makeRead(int len, Random randy, Read r)254 private static Read makeRead(int len, Random randy, Read r){ 255 if(r==null || r.bases==null || r.bases.length!=len){ 256 r=new Read(null, null, 0); 257 r.bases=new byte[len]; 258 } 259 byte[] bases=r.bases; 260 261 int pos=0; 262 final int basesPerRand=4;//Fewer calls to rand should be faster 263 for(int max=bases.length-(bases.length%basesPerRand); pos<max;){ 264 int x=randy.nextInt()%prime; 265 for(int i=0; i<basesPerRand; i++){ 266 int num=x&3; 267 byte b=AminoAcid.numberToBase[num]; 268 bases[pos]=b; 269 pos++; 270 x>>=2; 271 } 272 } 273 for(; pos<bases.length; pos++){ 274 int x=randy.nextInt()%prime; 275 int num=x&3; 276 byte b=AminoAcid.numberToBase[num]; 277 bases[pos]=b; 278 } 279 return r; 280 } 281 282 /*--------------------------------------------------------------*/ 283 /*---------------- Inner Classes ----------------*/ 284 /*--------------------------------------------------------------*/ 285 286 private class LogLogThread extends Thread{ 287 LogLogThread(CardinalityTracker log_, ConcurrentReadInputStream cris_, int tid_)288 LogLogThread(CardinalityTracker log_, ConcurrentReadInputStream cris_, int tid_){ 289 log=log_; 290 cris=cris_; 291 tid=tid_; 292 } 293 294 @Override run()295 public void run(){ 296 if(cris!=null){runCris();} 297 else{runSynth();} 298 } 299 runCris()300 public void runCris(){ 301 final int kt=k; 302 ListNum<Read> ln=cris.nextList(); 303 ArrayList<Read> reads=(ln!=null ? ln.list : null); 304 while(ln!=null && reads!=null && reads.size()>0){//ln!=null prevents a compiler potential null access warning 305 306 for(Read r : reads){ 307 // if(!r.validated()){r.validate(true);} 308 // if(r.mate!=null && !r.mate.validated()){r.mate.validate(true);} 309 log.hash(r); 310 readsProcessedT+=r.pairCount(); 311 basesProcessedT+=r.pairLength(); 312 kmersProcessedT+=r.numPairKmers(kt); 313 } 314 315 cris.returnList(ln); 316 ln=cris.nextList(); 317 reads=(ln!=null ? ln.list : null); 318 } 319 cris.returnList(ln); 320 } 321 runSynth()322 public void runSynth(){ 323 final int kt=k; 324 assert(maxReads>0 && maxReads<Long.MAX_VALUE); 325 long readsLeft=maxReads/threads; 326 readsLeft+=(maxReads%threads>tid ? 1 : 0); 327 Random randy=Shared.threadLocalRandom(seed2<0 ? seed2 : seed2+999999L); 328 329 Read r=null; 330 while(readsLeft>0){ 331 r=makeRead(150, randy, r); 332 log.hash(r); 333 readsProcessedT+=r.pairCount(); 334 basesProcessedT+=r.pairLength(); 335 kmersProcessedT+=r.numPairKmers(kt); 336 readsLeft--; 337 } 338 } 339 340 private final CardinalityTracker log; 341 private final ConcurrentReadInputStream cris; 342 private final int tid; 343 344 protected long readsProcessedT=0; 345 protected long basesProcessedT=0; 346 protected long kmersProcessedT=0; 347 348 } 349 350 /*--------------------------------------------------------------*/ 351 /*---------------- Fields ----------------*/ 352 /*--------------------------------------------------------------*/ 353 354 private int buckets=2048; 355 private int k=31; 356 private long seed=-1; 357 private long seed2=-1; 358 private float minProb=0; 359 private long countSum=0; 360 361 private String[] in1=null; 362 private String[] in2=null; 363 private String out=null; 364 365 /*--------------------------------------------------------------*/ 366 367 protected long readsProcessed=0; 368 protected long basesProcessed=0; 369 protected long kmersProcessed=0; 370 371 private long maxReads=-1; 372 373 boolean overwrite=false; 374 boolean append=false; 375 boolean errorState=false; 376 377 private int trials=1; 378 private boolean synth=false; 379 380 /*--------------------------------------------------------------*/ 381 /*---------------- Final Fields ----------------*/ 382 /*--------------------------------------------------------------*/ 383 384 private final FileFormat[] ffin1; 385 private final FileFormat[] ffin2; 386 387 final int threads; 388 private static final int prime=32452843; //A prime number 389 390 /*--------------------------------------------------------------*/ 391 /*---------------- Common Fields ----------------*/ 392 /*--------------------------------------------------------------*/ 393 394 private static PrintStream outstream=System.err; 395 public static boolean verbose=false; 396 } 397