1 package bloom;
2 
3 import java.util.Arrays;
4 import java.util.Random;
5 import java.util.concurrent.ArrayBlockingQueue;
6 
7 import shared.Shared;
8 import shared.Timer;
9 
10 
11 /**
12  *
13  * Uses hashing rather than direct-mapping to support longer kmers.
14  *
15  * @author Brian Bushnell
16  * @date Aug 17, 2012
17  *
18  */
19 public class KCountArray4MT extends KCountArray {
20 
21 	/**
22 	 *
23 	 */
24 	private static final long serialVersionUID = -575682515926973788L;
25 
main(String[] args)26 	public static void main(String[] args){
27 		long cells=Long.parseLong(args[0]);
28 		int bits=Integer.parseInt(args[1]);
29 		int gap=Integer.parseInt(args[2]);
30 		int hashes=Integer.parseInt(args[3]);
31 
32 		verbose=false;
33 
34 		KCountArray4MT kca=new KCountArray4MT(cells, bits, gap, hashes);
35 
36 		System.out.println(kca.read(0));
37 		kca.increment(0);
38 		System.out.println(kca.read(0));
39 		kca.increment(0);
40 		System.out.println(kca.read(0));
41 		System.out.println();
42 
43 		System.out.println(kca.read(1));
44 		kca.increment(1);
45 		System.out.println(kca.read(1));
46 		kca.increment(1);
47 		System.out.println(kca.read(1));
48 		System.out.println();
49 
50 		System.out.println(kca.read(100));
51 		kca.increment(100);
52 		System.out.println(kca.read(100));
53 		kca.increment(100);
54 		System.out.println(kca.read(100));
55 		kca.increment(100);
56 		System.out.println(kca.read(100));
57 		System.out.println();
58 
59 
60 		System.out.println(kca.read(150));
61 		kca.increment(150);
62 		System.out.println(kca.read(150));
63 		System.out.println();
64 
65 	}
66 
KCountArray4MT(long cells_, int bits_, int gap_, int hashes_)67 	public KCountArray4MT(long cells_, int bits_, int gap_, int hashes_){
68 		super(cells_, bits_, gap_);
69 //		verbose=false;
70 		long words=cells/cellsPerWord;
71 		long x=(words/numArrays);
72 		if(x>=Integer.MAX_VALUE){x=Integer.MAX_VALUE-3;}
73 		assert(x<=Integer.MAX_VALUE);
74 		wordsPerArray=(int)(x);
75 		cellsPerArray=cells/numArrays;
76 		cellMod=cellsPerArray-1;
77 		hashes=hashes_;
78 //		System.out.println("cells="+cells+", words="+words+", wordsPerArray="+wordsPerArray+", numArrays="+numArrays+", hashes="+hashes);
79 //		assert(false);
80 		matrix=new int[numArrays][];
81 		assert(hashes>0 && hashes<=hashMasks.length);
82 	}
83 
84 	@Override
read(final long rawKey)85 	public int read(final long rawKey){
86 		assert(finished);
87 		if(verbose){System.err.println("Reading raw key "+rawKey);}
88 		long key2=hash(rawKey, 0);
89 		int min=readHashed(key2);
90 		for(int i=1; i<hashes && min>0; i++){
91 			if(verbose){System.err.println("Reading. i="+i+", key2="+key2);}
92 			key2=Long.rotateRight(key2, hashBits);
93 			key2=hash(key2, i);
94 			if(verbose){System.err.println("Rot/hash. i="+i+", key2="+key2);}
95 			min=min(min, readHashed(key2));
96 		}
97 		return min;
98 	}
99 
readHashed(long key)100 	private int readHashed(long key){
101 		if(verbose){System.err.print("Reading hashed key "+key);}
102 //		System.out.println("key="+key);
103 		int arrayNum=(int)(key&arrayMask);
104 		key=(key>>>arrayBits)%(cellMod);
105 //		key=(key>>>(arrayBits+1))%(cellMod);
106 //		System.out.println("array="+arrayNum);
107 //		System.out.println("key2="+key);
108 		int[] array=matrix[arrayNum];
109 		int index=(int)(key>>>indexShift);
110 //		assert(false) : indexShift;
111 //		System.out.println("index="+index);
112 		int word=array[index];
113 //		System.out.println("word="+Integer.toHexString(word));
114 		assert(word>>>(cellBits*key) == word>>>(cellBits*(key&cellMask)));
115 //		int cellShift=(int)(cellBits*(key&cellMask));
116 		int cellShift=(int)(cellBits*key);
117 		if(verbose){System.err.println(", array="+arrayNum+", index="+index+", cellShift="+(cellShift%32)+", value="+((int)((word>>>cellShift)&valueMask)));}
118 //		System.out.println("cellShift="+cellShift);
119 		return (int)((word>>>cellShift)&valueMask);
120 	}
121 
122 	@Override
write(final long key, int value)123 	public void write(final long key, int value){
124 		throw new RuntimeException("Not allowed for this class.");
125 	}
126 
127 //	@Override
128 //	/** This should increase speed by doing the first hash outside the critical section, but it does not seem to help. */
129 //	public void increment(long[] keys){
130 //		for(int i=0; i<keys.length; i++){
131 //			keys[i]=hash(keys[i], 0);
132 //		}
133 //		synchronized(buffers){
134 //			for(long key : keys){
135 //				incrementPartiallyHashed(key);
136 //			}
137 //		}
138 //	}
139 
140 	@Override
increment(final long rawKey)141 	public void increment(final long rawKey){
142 		if(verbose){System.err.println("\n*** Incrementing raw key "+rawKey+" ***");}
143 
144 		long key2=rawKey;
145 		for(int i=0; i<hashes; i++){
146 			key2=hash(key2, i);
147 			if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));}
148 //			assert(readHashed(key2)==0);
149 
150 			int bnum=(int)(key2&arrayMask);
151 			long[] array=buffers[bnum];
152 			int loc=bufferlen[bnum];
153 			array[loc]=key2;
154 			bufferlen[bnum]++;
155 			if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
156 			if(bufferlen[bnum]>=array.length){
157 
158 				if(verbose){System.err.println("Moving array.");}
159 				bufferlen[bnum]=0;
160 				buffers[bnum]=new long[array.length];
161 
162 				writers[bnum].add(array);
163 				if(verbose){System.err.println("Moved.");}
164 			}
165 //			assert(read(rawKey)<=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey;
166 //			assert(readHashed(key2)>=min+incr) : "i="+i+", original="+min+", new should be <="+(min+incr)+", new="+read(rawKey)+", max="+maxValue+", key="+rawKey;
167 			key2=Long.rotateRight(key2, hashBits);
168 		}
169 	}
170 
incrementPartiallyHashed(final long pKey)171 	private void incrementPartiallyHashed(final long pKey){
172 		if(verbose){System.err.println("\n*** Incrementing key "+pKey+" ***");}
173 
174 		long key2=pKey;
175 
176 		{
177 			int bnum=(int)(key2&arrayMask);
178 			long[] array=buffers[bnum];
179 			int loc=bufferlen[bnum];
180 			array[loc]=key2;
181 			bufferlen[bnum]++;
182 			if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
183 			if(bufferlen[bnum]>=array.length){
184 
185 				if(verbose){System.err.println("Moving array.");}
186 				bufferlen[bnum]=0;
187 				buffers[bnum]=new long[array.length];
188 
189 				writers[bnum].add(array);
190 				if(verbose){System.err.println("Moved.");}
191 			}
192 		}
193 
194 		for(int i=1; i<hashes; i++){
195 			key2=Long.rotateRight(key2, hashBits);
196 			key2=hash(key2, i);
197 			if(verbose){System.err.println("key2="+key2+", value="+readHashed(key2));}
198 //			assert(readHashed(key2)==0);
199 
200 			int bnum=(int)(key2&arrayMask);
201 			long[] array=buffers[bnum];
202 			int loc=bufferlen[bnum];
203 			array[loc]=key2;
204 			bufferlen[bnum]++;
205 			if(verbose){System.err.println("bufferlen["+bnum+"] = "+bufferlen[bnum]);}
206 			if(bufferlen[bnum]>=array.length){
207 
208 				if(verbose){System.err.println("Moving array.");}
209 				bufferlen[bnum]=0;
210 				buffers[bnum]=new long[array.length];
211 
212 				writers[bnum].add(array);
213 				if(verbose){System.err.println("Moved.");}
214 			}
215 		}
216 	}
217 
218 	@Override
incrementAndReturn(long key, int incr)219 	public int incrementAndReturn(long key, int incr){
220 		throw new RuntimeException("Operation not supported.");
221 	}
222 
223 	/** Returns unincremented value */
224 	@Override
incrementAndReturnUnincremented(long key, int incr)225 	public int incrementAndReturnUnincremented(long key, int incr){
226 		throw new RuntimeException("Operation not supported.");
227 	}
228 
229 	@Override
transformToFrequency()230 	public long[] transformToFrequency(){
231 		return transformToFrequency(matrix);
232 	}
233 
234 	@Override
toContentsString()235 	public String toContentsString(){
236 		StringBuilder sb=new StringBuilder();
237 		sb.append("[");
238 		String comma="";
239 		for(int[] array : matrix){
240 			for(int i=0; i<array.length; i++){
241 				int word=array[i];
242 				for(int j=0; j<cellsPerWord; j++){
243 					int x=word&valueMask;
244 					sb.append(comma);
245 					sb.append(x);
246 					word>>>=cellBits;
247 					comma=", ";
248 				}
249 			}
250 		}
251 		sb.append("]");
252 		return sb.toString();
253 	}
254 
255 	@Override
usedFraction()256 	public double usedFraction(){return cellsUsed/(double)cells;}
257 
258 	@Override
usedFraction(int mindepth)259 	public double usedFraction(int mindepth){return cellsUsed(mindepth)/(double)cells;}
260 
261 	@Override
cellsUsed(int mindepth)262 	public long cellsUsed(int mindepth){
263 		long count=0;
264 		for(int[] array : matrix){
265 			if(array!=null){
266 				for(int word : array){
267 					while(word>0){
268 						int x=word&valueMask;
269 						if(x>=mindepth){count++;}
270 						word>>>=cellBits;
271 					}
272 				}
273 			}
274 		}
275 		return count;
276 	}
277 
278 
279 	@Override
hash(long key, int row)280 	final long hash(long key, int row){
281 		int cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1));
282 //		int cell=(int)(hashCellMask&(key));
283 
284 		if(row==0){//Doublehash only first time
285 			key=key^hashMasks[(row+4)%hashMasks.length][cell];
286 			cell=(int)(hashCellMask&(key>>4));
287 //			cell=(int)(hashCellMask&(key>>hashBits));
288 //			cell=(int)((Long.MAX_VALUE&key)%(hashArrayLength-1));
289 		}
290 
291 		return key^hashMasks[row][cell];
292 	}
293 
294 	/**
295 	 * @param i
296 	 * @param j
297 	 * @return
298 	 */
makeMasks(int rows, int cols)299 	private static long[][] makeMasks(int rows, int cols) {
300 
301 		long seed;
302 		synchronized(KCountArray4MT.class){
303 			seed=counter;
304 			counter++;
305 		}
306 
307 		Timer t=new Timer();
308 		long[][] r=new long[rows][cols];
309 		Random randy=Shared.threadLocalRandom(seed);
310 		for(int i=0; i<r.length; i++){
311 			fillMasks(r[i], randy);
312 		}
313 		t.stop();
314 		if(t.elapsed>200000000L){System.out.println("Mask-creation time: "+t);}
315 		return r;
316 	}
317 
fillMasks(long[] r, Random randy)318 	private static void fillMasks(long[] r, Random randy) {
319 //		for(int i=0; i<r.length; i++){
320 //			long x=0;
321 //			while(Long.bitCount(x&0xFFFFFFFF)!=16){
322 //				x=randy.nextLong();
323 //			}
324 //			r[i]=(x&Long.MAX_VALUE);
325 //		}
326 
327 		final int hlen=(1<<hashBits);
328 		assert(r.length==hlen);
329 		int[] count1=new int[hlen];
330 		int[] count2=new int[hlen];
331 		final long mask=hlen-1;
332 
333 		for(int i=0; i<r.length; i++){
334 			long x=0;
335 			int y=0;
336 			int z=0;
337 			while(Long.bitCount(x&0xFFFFFFFFL)!=16){
338 				x=randy.nextLong();
339 				while(Long.bitCount(x&0xFFFFFFFFL)<16){
340 					x|=(1L<<randy.nextInt(32));
341 				}
342 				while(Long.bitCount(x&0xFFFFFFFFL)>16){
343 					x&=(~(1L<<randy.nextInt(32)));
344 				}
345 				while(Long.bitCount(x&0xFFFFFFFF00000000L)<16){
346 					x|=(1L<<(randy.nextInt(32)+32));
347 				}
348 				while(Long.bitCount(x&0xFFFFFFFF00000000L)>16){
349 					x&=(~(1L<<(randy.nextInt(32)+32)));
350 				}
351 
352 //				System.out.print(".");
353 //				y=(((int)(x&mask))^i);
354 				y=(((int)(x&mask)));
355 				z=(int)((x>>hashBits)&mask);
356 				if(count1[y]>0 || count2[z]>0){
357 					x=0;
358 				}
359 			}
360 //			System.out.println(Long.toBinaryString(x));
361 			r[i]=(x&Long.MAX_VALUE);
362 			count1[y]++;
363 			count2[z]++;
364 		}
365 
366 	}
367 
368 
369 	@Override
initialize()370 	public void initialize(){
371 		for(int i=0; i<writers.length; i++){
372 			writers[i]=new WriteThread(i);
373 			writers[i].start();
374 
375 //			while(!writers[i].isAlive()){
376 //				System.out.print(".");
377 //			}
378 		}
379 	}
380 
381 	@Override
shutdown()382 	public void shutdown(){
383 		if(finished){return;}
384 		synchronized(this){
385 			if(finished){return;}
386 
387 			//Clear buffers
388 			for(int i=0; i<numArrays; i++){
389 				long[] array=buffers[i];
390 				int len=bufferlen[i];
391 				buffers[i]=null;
392 				bufferlen[i]=0;
393 
394 				if(len<array.length){
395 					array=Arrays.copyOf(array, len);
396 				}
397 
398 				if(array.length>0){
399 					writers[i].add(array);
400 				}
401 			}
402 
403 			//Add poison
404 			for(WriteThread wt : writers){
405 				wt.add(poison);
406 			}
407 
408 			//Wait for termination
409 			for(WriteThread wt : writers){
410 //				System.out.println("wt"+wt.num+" is alive: "+wt.isAlive());
411 				while(wt.isAlive()){
412 //					System.out.println("wt"+wt.num+" is alive: "+wt.isAlive());
413 					try {
414 						wt.join(10000);
415 					} catch (InterruptedException e) {
416 						// TODO Auto-generated catch block
417 						e.printStackTrace();
418 					}
419 					if(wt.isAlive()){System.err.println(wt.getClass().getCanonicalName()+" is taking a long time to die.");}
420 				}
421 				cellsUsed+=wt.cellsUsedPersonal;
422 //				System.out.println("cellsUsed="+cellsUsed);
423 			}
424 
425 			assert(!finished);
426 			finished=true;
427 		}
428 	}
429 
430 	private class WriteThread extends Thread{
431 
WriteThread(int tnum)432 		public WriteThread(int tnum){
433 			num=tnum;
434 		}
435 
436 		@Override
run()437 		public void run(){
438 			assert(matrix[num]==null);
439 			array=new int[wordsPerArray]; //Makes NUMA systems use local memory.
440 
441 			matrix[num]=array;
442 
443 			long[] keys=null;
444 			while(!shutdown){
445 
446 				if(verbose){System.err.println(" - Reading keys for wt"+num+".");}
447 				while(keys==null){
448 					try {
449 						keys=writeQueue.take();
450 					} catch (InterruptedException e) {
451 						// TODO Auto-generated catch block
452 						e.printStackTrace();
453 					}
454 				}
455 				if(keys==poison){
456 //					assert(false);
457 					shutdown=true;
458 				}else{
459 					for(long key : keys){
460 						incrementHashedLocal(key);
461 					}
462 				}
463 //				System.out.println(" -- Read keys for   wt"+num+". poison="+(keys==poison)+", len="+keys.length);
464 				if(verbose){System.err.println(" -- Read keys for   wt"+num+". (success)");}
465 				keys=null;
466 				if(verbose){System.err.println("shutdown="+shutdown);}
467 			}
468 
469 //			System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+".");
470 //			assert(false) : ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I died: "+shutdown+", "+(keys==null)+".";
471 
472 			array=null;
473 		}
474 
add(long[] keys)475 		void add(long[] keys){
476 //			assert(isAlive());
477 			assert(!shutdown);
478 			if(shutdown){return;}
479 //			assert(keys!=poison);
480 			if(verbose){System.err.println(" + Adding keys to wt"+num+".");}
481 			boolean success=false;
482 			while(!success){
483 				try {
484 					writeQueue.put(keys);
485 					success=true;
486 				} catch (InterruptedException e) {
487 					// TODO Auto-generated catch block
488 					e.printStackTrace();
489 				}
490 			}
491 			if(verbose){System.err.println(" ++ Added keys to wt"+num+". (success)");}
492 		}
493 
incrementHashedLocal(long key)494 		private int incrementHashedLocal(long key){
495 			assert((key&arrayMask)==num);
496 			key=(key>>>arrayBits)%(cellMod);
497 //			key=(key>>>(arrayBits+1))%(cellMod);
498 			int index=(int)(key>>>indexShift);
499 			int word=array[index];
500 			int cellShift=(int)(cellBits*key);
501 			int value=((word>>>cellShift)&valueMask);
502 			if(value==0){cellsUsedPersonal++;}
503 			value=min(value+1, maxValue);
504 			word=(value<<cellShift)|(word&~((valueMask)<<cellShift));
505 			array[index]=word;
506 			return value;
507 		}
508 
509 		private int[] array;
510 		private final int num;
511 		public long cellsUsedPersonal=0;
512 
513 		public ArrayBlockingQueue<long[]> writeQueue=new ArrayBlockingQueue<long[]>(16);
514 		public boolean shutdown=false;
515 
516 	}
517 
518 
cellsUsed()519 	public long cellsUsed(){return cellsUsed;}
520 
521 	private boolean finished=false;
522 
523 	private long cellsUsed;
524 	final int[][] matrix;
525 	private final WriteThread[] writers=new WriteThread[numArrays];
526 	private final int hashes;
527 	final int wordsPerArray;
528 	private final long cellsPerArray;
529 	final long cellMod;
530 	private final long[][] hashMasks=makeMasks(8, hashArrayLength);
531 
532 	private final long[][] buffers=new long[numArrays][1000];
533 	private final int[] bufferlen=new int[numArrays];
534 
535 	private static final int hashBits=6;
536 	private static final int hashArrayLength=1<<hashBits;
537 	private static final int hashCellMask=hashArrayLength-1;
538 	static final long[] poison=new long[0];
539 
540 	private static long counter=0;
541 
542 }
543