1 package bloom; 2 3 import java.util.Arrays; 4 import java.util.Random; 5 import java.util.concurrent.ArrayBlockingQueue; 6 7 import shared.Shared; 8 import shared.Timer; 9 10 11 /** 12 * 13 * Uses hashing rather than direct-mapping to support longer kmers. 14 * 15 * @author Brian Bushnell 16 * @date Aug 17, 2012 17 * 18 */ 19 public class KCountArray4MT extends KCountArray { 20 21 /** 22 * 23 */ 24 private static final long serialVersionUID = -575682515926973788L; 25 main(String[] args)26 public static void main(String[] args){ 27 long cells=Long.parseLong(args[0]); 28 int bits=Integer.parseInt(args[1]); 29 int gap=Integer.parseInt(args[2]); 30 int hashes=Integer.parseInt(args[3]); 31 32 verbose=false; 33 34 KCountArray4MT kca=new KCountArray4MT(cells, bits, gap, hashes); 35 36 System.out.println(kca.read(0)); 37 kca.increment(0); 38 System.out.println(kca.read(0)); 39 kca.increment(0); 40 System.out.println(kca.read(0)); 41 System.out.println(); 42 43 System.out.println(kca.read(1)); 44 kca.increment(1); 45 System.out.println(kca.read(1)); 46 kca.increment(1); 47 System.out.println(kca.read(1)); 48 System.out.println(); 49 50 System.out.println(kca.read(100)); 51 kca.increment(100); 52 System.out.println(kca.read(100)); 53 kca.increment(100); 54 System.out.println(kca.read(100)); 55 kca.increment(100); 56 System.out.println(kca.read(100)); 57 System.out.println(); 58 59 60 System.out.println(kca.read(150)); 61 kca.increment(150); 62 System.out.println(kca.read(150)); 63 System.out.println(); 64 65 } 66 KCountArray4MT(long cells_, int bits_, int gap_, int hashes_)67 public KCountArray4MT(long cells_, int bits_, int gap_, int hashes_){ 68 super(cells_, bits_, gap_); 69 // verbose=false; 70 long words=cells/cellsPerWord; 71 long x=(words/numArrays); 72 if(x>=Integer.MAX_VALUE){x=Integer.MAX_VALUE-3;} 73 assert(x<=Integer.MAX_VALUE); 74 wordsPerArray=(int)(x); 75 cellsPerArray=cells/numArrays; 76 cellMod=cellsPerArray-1; 77 hashes=hashes_; 78 // System.out.println("cells="+cells+", words="+words+", wordsPerArray="+wordsPerArray+", numArrays="+numArrays+", hashes="+hashes); 79 // assert(false); 80 matrix=new int[numArrays][]; 81 assert(hashes>0 && hashes<=hashMasks.length); 82 } 83 84 @Override read(final long rawKey)85 public int read(final long rawKey){ 86 assert(finished); 87 if(verbose){System.err.println("Reading raw key "+rawKey);} 88 long key2=hash(rawKey, 0); 89 int min=readHashed(key2); 90 for(int i=1; i<hashes && min>0; i++){ 91 if(verbose){System.err.println("Reading. i="+i+", key2="+key2);} 92 key2=Long.rotateRight(key2, hashBits); 93 key2=hash(key2, i); 94 if(verbose){System.err.println("Rot/hash. i="+i+", key2="+key2);} 95 min=min(min, readHashed(key2)); 96 } 97 return min; 98 } 99 readHashed(long key)100 private int readHashed(long key){ 101 if(verbose){System.err.print("Reading hashed key "+key);} 102 // System.out.println("key="+key); 103 int arrayNum=(int)(key&arrayMask); 104 key=(key>>>arrayBits)%(cellMod); 105 // key=(key>>>(arrayBits+1))%(cellMod); 106 // System.out.println("array="+arrayNum); 107 // System.out.println("key2="+key); 108 int[] array=matrix[arrayNum]; 109 int index=(int)(key>>>indexShift); 110 // assert(false) : indexShift; 111 // System.out.println("index="+index); 112 int word=array[index]; 113 // System.out.println("word="+Integer.toHexString(word)); 114 assert(word>>>(cellBits*key) == word>>>(cellBits*(key&cellMask))); 115 // int cellShift=(int)(cellBits*(key&cellMask)); 116 int cellShift=(int)(cellBits*key); 117 if(verbose){System.err.println(", array="+arrayNum+", index="+index+", cellShift="+(cellShift%32)+", value="+((int)((word>>>cellShift)&valueMask)));} 118 // System.out.println("cellShift="+cellShift); 119 return (int)((word>>>cellShift)&valueMask); 120 } 121 122 @Override write(final long key, int value)123 public void write(final long key, int value){ 124 throw new RuntimeException("Not allowed for this class."); 125 } 126 127 // @Override 128 // /** This should increase speed by doing the first hash outside the critical section, but it does not seem to help. */ 129 // public void increment(long[] keys){ 130 // for(int i=0; i<keys.length; i++){ 131 // keys[i]=hash(keys[i], 0); 132 // } 133 // synchronized(buffers){ 134 // for(long key : keys){ 135 // incrementPartiallyHashed(key); 136 // } 137 // } 138 // } 139 140 @Override increment(final long rawKey)141 public void increment(final long rawKey){ 142 if(verbose){System.err.println("\n*** Incrementing raw key "+rawKey+" ***");} 143 144 long key2=rawKey; 145 for(int i=0; i<hashes; i++){ 146 key2=hash(key2, i); 147 if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));} 148 // assert(readHashed(key2)==0); 149 150 int bnum=(int)(key2&arrayMask); 151 long[] array=buffers[bnum]; 152 int loc=bufferlen[bnum]; 153 array[loc]=key2; 154 bufferlen[bnum]++; 155 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);} 156 if(bufferlen[bnum]>=array.length){ 157 158 if(verbose){System.err.println("Moving array.");} 159 bufferlen[bnum]=0; 160 buffers[bnum]=new long[array.length]; 161 162 writers[bnum].add(array); 163 if(verbose){System.err.println("Moved.");} 164 } 165 // assert(read(rawKey)<=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey; 166 // assert(readHashed(key2)>=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey; 167 key2=Long.rotateRight(key2, hashBits); 168 } 169 } 170 incrementPartiallyHashed(final long pKey)171 private void incrementPartiallyHashed(final long pKey){ 172 if(verbose){System.err.println("\n*** Incrementing key "+pKey+" ***");} 173 174 long key2=pKey; 175 176 { 177 int bnum=(int)(key2&arrayMask); 178 long[] array=buffers[bnum]; 179 int loc=bufferlen[bnum]; 180 array[loc]=key2; 181 bufferlen[bnum]++; 182 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);} 183 if(bufferlen[bnum]>=array.length){ 184 185 if(verbose){System.err.println("Moving array.");} 186 bufferlen[bnum]=0; 187 buffers[bnum]=new long[array.length]; 188 189 writers[bnum].add(array); 190 if(verbose){System.err.println("Moved.");} 191 } 192 } 193 194 for(int i=1; i<hashes; i++){ 195 key2=Long.rotateRight(key2, hashBits); 196 key2=hash(key2, i); 197 if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));} 198 // assert(readHashed(key2)==0); 199 200 int bnum=(int)(key2&arrayMask); 201 long[] array=buffers[bnum]; 202 int loc=bufferlen[bnum]; 203 array[loc]=key2; 204 bufferlen[bnum]++; 205 if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);} 206 if(bufferlen[bnum]>=array.length){ 207 208 if(verbose){System.err.println("Moving array.");} 209 bufferlen[bnum]=0; 210 buffers[bnum]=new long[array.length]; 211 212 writers[bnum].add(array); 213 if(verbose){System.err.println("Moved.");} 214 } 215 } 216 } 217 218 @Override incrementAndReturn(long key, int incr)219 public int incrementAndReturn(long key, int incr){ 220 throw new RuntimeException("Operation not supported."); 221 } 222 223 /** Returns unincremented value */ 224 @Override incrementAndReturnUnincremented(long key, int incr)225 public int incrementAndReturnUnincremented(long key, int incr){ 226 throw new RuntimeException("Operation not supported."); 227 } 228 229 @Override transformToFrequency()230 public long[] transformToFrequency(){ 231 return transformToFrequency(matrix); 232 } 233 234 @Override toContentsString()235 public String toContentsString(){ 236 StringBuilder sb=new StringBuilder(); 237 sb.append("["); 238 String comma=""; 239 for(int[] array : matrix){ 240 for(int i=0; i<array.length; i++){ 241 int word=array[i]; 242 for(int j=0; j<cellsPerWord; j++){ 243 int x=word&valueMask; 244 sb.append(comma); 245 sb.append(x); 246 word>>>=cellBits; 247 comma=", "; 248 } 249 } 250 } 251 sb.append("]"); 252 return sb.toString(); 253 } 254 255 @Override usedFraction()256 public double usedFraction(){return cellsUsed/(double)cells;} 257 258 @Override usedFraction(int mindepth)259 public double usedFraction(int mindepth){return cellsUsed(mindepth)/(double)cells;} 260 261 @Override cellsUsed(int mindepth)262 public long cellsUsed(int mindepth){ 263 long count=0; 264 for(int[] array : matrix){ 265 if(array!=null){ 266 for(int word : array){ 267 while(word>0){ 268 int x=word&valueMask; 269 if(x>=mindepth){count++;} 270 word>>>=cellBits; 271 } 272 } 273 } 274 } 275 return count; 276 } 277 278 279 @Override hash(long key, int row)280 final long hash(long key, int row){ 281 int cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); 282 // int cell=(int)(hashCellMask&(key)); 283 284 if(row==0){//Doublehash only first time 285 key=key^hashMasks[(row+4)%hashMasks.length][cell]; 286 cell=(int)(hashCellMask&(key>>4)); 287 // cell=(int)(hashCellMask&(key>>hashBits)); 288 // cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1)); 289 } 290 291 return key^hashMasks[row][cell]; 292 } 293 294 /** 295 * @param i 296 * @param j 297 * @return 298 */ makeMasks(int rows, int cols)299 private static long[][] makeMasks(int rows, int cols) { 300 301 long seed; 302 synchronized(KCountArray4MT.class){ 303 seed=counter; 304 counter++; 305 } 306 307 Timer t=new Timer(); 308 long[][] r=new long[rows][cols]; 309 Random randy=Shared.threadLocalRandom(seed); 310 for(int i=0; i<r.length; i++){ 311 fillMasks(r[i], randy); 312 } 313 t.stop(); 314 if(t.elapsed>200000000L){System.out.println("Mask-creation time: "+t);} 315 return r; 316 } 317 fillMasks(long[] r, Random randy)318 private static void fillMasks(long[] r, Random randy) { 319 // for(int i=0; i<r.length; i++){ 320 // long x=0; 321 // while(Long.bitCount(x&0xFFFFFFFF)!=16){ 322 // x=randy.nextLong(); 323 // } 324 // r[i]=(x&Long.MAX_VALUE); 325 // } 326 327 final int hlen=(1<<hashBits); 328 assert(r.length==hlen); 329 int[] count1=new int[hlen]; 330 int[] count2=new int[hlen]; 331 final long mask=hlen-1; 332 333 for(int i=0; i<r.length; i++){ 334 long x=0; 335 int y=0; 336 int z=0; 337 while(Long.bitCount(x&0xFFFFFFFFL)!=16){ 338 x=randy.nextLong(); 339 while(Long.bitCount(x&0xFFFFFFFFL)<16){ 340 x|=(1L<<randy.nextInt(32)); 341 } 342 while(Long.bitCount(x&0xFFFFFFFFL)>16){ 343 x&=(~(1L<<randy.nextInt(32))); 344 } 345 while(Long.bitCount(x&0xFFFFFFFF00000000L)<16){ 346 x|=(1L<<(randy.nextInt(32)+32)); 347 } 348 while(Long.bitCount(x&0xFFFFFFFF00000000L)>16){ 349 x&=(~(1L<<(randy.nextInt(32)+32))); 350 } 351 352 // System.out.print("."); 353 // y=(((int)(x&mask))^i); 354 y=(((int)(x&mask))); 355 z=(int)((x>>hashBits)&mask); 356 if(count1[y]>0 || count2[z]>0){ 357 x=0; 358 } 359 } 360 // System.out.println(Long.toBinaryString(x)); 361 r[i]=(x&Long.MAX_VALUE); 362 count1[y]++; 363 count2[z]++; 364 } 365 366 } 367 368 369 @Override initialize()370 public void initialize(){ 371 for(int i=0; i<writers.length; i++){ 372 writers[i]=new WriteThread(i); 373 writers[i].start(); 374 375 // while(!writers[i].isAlive()){ 376 // System.out.print("."); 377 // } 378 } 379 } 380 381 @Override shutdown()382 public void shutdown(){ 383 if(finished){return;} 384 synchronized(this){ 385 if(finished){return;} 386 387 //Clear buffers 388 for(int i=0; i<numArrays; i++){ 389 long[] array=buffers[i]; 390 int len=bufferlen[i]; 391 buffers[i]=null; 392 bufferlen[i]=0; 393 394 if(len<array.length){ 395 array=Arrays.copyOf(array, len); 396 } 397 398 if(array.length>0){ 399 writers[i].add(array); 400 } 401 } 402 403 //Add poison 404 for(WriteThread wt : writers){ 405 wt.add(poison); 406 } 407 408 //Wait for termination 409 for(WriteThread wt : writers){ 410 // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); 411 while(wt.isAlive()){ 412 // System.out.println("wt"+wt.num+" is alive: "+wt.isAlive()); 413 try { 414 wt.join(10000); 415 } catch (InterruptedException e) { 416 // TODO Auto-generated catch block 417 e.printStackTrace(); 418 } 419 if(wt.isAlive()){System.err.println(wt.getClass().getCanonicalName()+" is taking a long time to die.");} 420 } 421 cellsUsed+=wt.cellsUsedPersonal; 422 // System.out.println("cellsUsed="+cellsUsed); 423 } 424 425 assert(!finished); 426 finished=true; 427 } 428 } 429 430 private class WriteThread extends Thread{ 431 WriteThread(int tnum)432 public WriteThread(int tnum){ 433 num=tnum; 434 } 435 436 @Override run()437 public void run(){ 438 assert(matrix[num]==null); 439 array=new int[wordsPerArray]; //Makes NUMA systems use local memory. 440 441 matrix[num]=array; 442 443 long[] keys=null; 444 while(!shutdown){ 445 446 if(verbose){System.err.println(" - Reading keys for wt"+num+".");} 447 while(keys==null){ 448 try { 449 keys=writeQueue.take(); 450 } catch (InterruptedException e) { 451 // TODO Auto-generated catch block 452 e.printStackTrace(); 453 } 454 } 455 if(keys==poison){ 456 // assert(false); 457 shutdown=true; 458 }else{ 459 for(long key : keys){ 460 incrementHashedLocal(key); 461 } 462 } 463 // System.out.println(" -- Read keys for wt"+num+". poison="+(keys==poison)+", len="+keys.length); 464 if(verbose){System.err.println(" -- Read keys for wt"+num+". (success)");} 465 keys=null; 466 if(verbose){System.err.println("shutdown="+shutdown);} 467 } 468 469 // System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."); 470 // assert(false) : ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+"."; 471 472 array=null; 473 } 474 add(long[] keys)475 void add(long[] keys){ 476 // assert(isAlive()); 477 assert(!shutdown); 478 if(shutdown){return;} 479 // assert(keys!=poison); 480 if(verbose){System.err.println(" + Adding keys to wt"+num+".");} 481 boolean success=false; 482 while(!success){ 483 try { 484 writeQueue.put(keys); 485 success=true; 486 } catch (InterruptedException e) { 487 // TODO Auto-generated catch block 488 e.printStackTrace(); 489 } 490 } 491 if(verbose){System.err.println(" ++ Added keys to wt"+num+". (success)");} 492 } 493 incrementHashedLocal(long key)494 private int incrementHashedLocal(long key){ 495 assert((key&arrayMask)==num); 496 key=(key>>>arrayBits)%(cellMod); 497 // key=(key>>>(arrayBits+1))%(cellMod); 498 int index=(int)(key>>>indexShift); 499 int word=array[index]; 500 int cellShift=(int)(cellBits*key); 501 int value=((word>>>cellShift)&valueMask); 502 if(value==0){cellsUsedPersonal++;} 503 value=min(value+1, maxValue); 504 word=(value<<cellShift)|(word&~((valueMask)<<cellShift)); 505 array[index]=word; 506 return value; 507 } 508 509 private int[] array; 510 private final int num; 511 public long cellsUsedPersonal=0; 512 513 public ArrayBlockingQueue<long[]> writeQueue=new ArrayBlockingQueue<long[]>(16); 514 public boolean shutdown=false; 515 516 } 517 518 cellsUsed()519 public long cellsUsed(){return cellsUsed;} 520 521 private boolean finished=false; 522 523 private long cellsUsed; 524 final int[][] matrix; 525 private final WriteThread[] writers=new WriteThread[numArrays]; 526 private final int hashes; 527 final int wordsPerArray; 528 private final long cellsPerArray; 529 final long cellMod; 530 private final long[][] hashMasks=makeMasks(8, hashArrayLength); 531 532 private final long[][] buffers=new long[numArrays][1000]; 533 private final int[] bufferlen=new int[numArrays]; 534 535 private static final int hashBits=6; 536 private static final int hashArrayLength=1<<hashBits; 537 private static final int hashCellMask=hashArrayLength-1; 538 static final long[] poison=new long[0]; 539 540 private static long counter=0; 541 542 } 543