1 package org.jgroups.blocks;
2 
3 import org.jgroups.Address;
4 import org.jgroups.JChannel;
5 import org.jgroups.MembershipListener;
6 import org.jgroups.View;
7 import org.jgroups.annotations.Experimental;
8 import org.jgroups.annotations.ManagedAttribute;
9 import org.jgroups.annotations.ManagedOperation;
10 import org.jgroups.annotations.Unsupported;
11 import org.jgroups.logging.Log;
12 import org.jgroups.logging.LogFactory;
13 import org.jgroups.util.Util;
14 
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.DataInputStream;
18 import java.io.DataOutputStream;
19 import java.lang.reflect.Method;
20 import java.util.*;
21 
22 /** Hashmap which distributes its keys and values across the cluster. A PUT/GET/REMOVE computes the cluster node to which
23  * or from which to get/set the key/value from a hash of the key and then forwards the request to the remote cluster node.
24  * We also maintain a local cache (L1 cache) which is a bounded cache that caches retrieved keys/values. <br/>
25  * Todos:<br/>
26  * <ol>
27  * <li>Use MarshalledValue to keep track of byte[] buffers, and be able to compute the exact size of the cache. This is
28  *     good for maintaining a bounded cache (rather than using the number of entries)
29  * <li>Provide a better consistent hashing algorithm than ConsistentHashFunction as default
30  * <li>GUI (showing at least the topology and L1 and L2 caches)
31  * <li>Notifications (puts, removes, gets etc)
32  * <li>Invalidation of L1 caches (if used) on removal/put of item
33  * <li>Benchmarks, comparison to memcached
34  * <li>Documentation, comparison to memcached
35  * </ol>
36  * @author Bela Ban
37  */
38 @Experimental @Unsupported
39 public class PartitionedHashMap<K,V> implements MembershipListener {
40 
41     /** The cache in which all partitioned entries are located */
42     private Cache<K,V> l2_cache=new Cache<K,V>();
43 
44     /** The local bounded cache, to speed up access to frequently accessed entries. Can be disabled or enabled */
45     private Cache<K,V> l1_cache=null;
46 
47     private static final Log log=LogFactory.getLog(PartitionedHashMap.class);
48     private JChannel ch=null;
49     private Address local_addr=null;
50     private View    view;
51     private RpcDispatcher disp=null;
52     @ManagedAttribute(writable=true)
53     private String props="udp.xml";
54     @ManagedAttribute(writable=true)
55     private String cluster_name="PartitionedHashMap-Cluster";
56     @ManagedAttribute(writable=true)
57     private long call_timeout=1000L;
58     @ManagedAttribute(writable=true)
59     private long caching_time=30000L; // in milliseconds. -1 means don't cache, 0 means cache forever (or until changed)
60     private HashFunction<K> hash_function=null;
61     private Set<MembershipListener> membership_listeners=new HashSet<MembershipListener>();
62 
63     /** On a view change, if a member P1 detects that for any given key K, P1 is not the owner of K, then
64      * it will compute the new owner P2 and transfer ownership for all Ks for which P2 is the new owner. P1
65      * will then also evict those keys from its L2 cache */
66     @ManagedAttribute(writable=true)
67     private boolean migrate_data=false;
68 
69     private static final short PUT     = 1;
70     private static final short GET     = 2;
71     private static final short REMOVE  = 3;
72 
73     protected static Map<Short,Method> methods=Util.createConcurrentMap(8);
74 
75      static {
76         try {
methods.put(PUT, PartitionedHashMap.class.getMethod(R, Object.class, Object.class, long.class))77             methods.put(PUT, PartitionedHashMap.class.getMethod("_put",
78                                                                 Object.class,
79                                                                 Object.class,
80                                                                 long.class));
methods.put(GET, PartitionedHashMap.class.getMethod(R, Object.class))81             methods.put(GET, PartitionedHashMap.class.getMethod("_get",
82                                                                Object.class));
methods.put(REMOVE, PartitionedHashMap.class.getMethod(R, Object.class))83             methods.put(REMOVE, PartitionedHashMap.class.getMethod("_remove", Object.class));
84         }
85         catch(NoSuchMethodException e) {
86             throw new RuntimeException(e);
87         }
88     }
89 
90 
91     public interface HashFunction<K> {
92         /**
93          * Defines a hash function to pick the right node from the list of cluster nodes. Ideally, this function uses
94          * consistent hashing, so that the same key maps to the same node despite cluster view changes. If a view change
95          * causes all keys to hash to different nodes, then PartitionedHashMap will redirect requests to different nodes
96          * and this causes unnecessary overhead.
97          * @param key The object to be hashed
98          * @param membership The membership. This value can be ignored for example if the hash function keeps
99          * track of the membership itself, e.g. by registering as a membership
100          * listener ({@link PartitionedHashMap#addMembershipListener(org.jgroups.MembershipListener)} )
101          * @return
102          */
hash(K key, List<Address> membership)103         Address hash(K key, List<Address> membership);
104     }
105 
106 
PartitionedHashMap(String props, String cluster_name)107     public PartitionedHashMap(String props, String cluster_name) {
108         this.props=props;
109         this.cluster_name=cluster_name;
110     }
111 
getProps()112     public String getProps() {
113         return props;
114     }
115 
setProps(String props)116     public void setProps(String props) {
117         this.props=props;
118     }
119 
getLocalAddress()120     public Address getLocalAddress() {
121         return local_addr;
122     }
123 
124     @ManagedAttribute
getLocalAddressAsString()125     public String getLocalAddressAsString() {
126         return local_addr != null? local_addr.toString() : "null";
127     }
128 
129     @ManagedAttribute
getView()130     public String getView() {
131         return view != null? view.toString() : "null";
132     }
133 
134     @ManagedAttribute
isL1CacheEnabled()135     public boolean isL1CacheEnabled() {
136         return l1_cache != null;
137     }
138 
getClusterName()139     public String getClusterName() {
140         return cluster_name;
141     }
142 
setClusterName(String cluster_name)143     public void setClusterName(String cluster_name) {
144         this.cluster_name=cluster_name;
145     }
146 
getCallTimeout()147     public long getCallTimeout() {
148         return call_timeout;
149     }
150 
setCallTimeout(long call_timeout)151     public void setCallTimeout(long call_timeout) {
152         this.call_timeout=call_timeout;
153     }
154 
getCachingTime()155     public long getCachingTime() {
156         return caching_time;
157     }
158 
setCachingTime(long caching_time)159     public void setCachingTime(long caching_time) {
160         this.caching_time=caching_time;
161     }
162 
isMigrateData()163     public boolean isMigrateData() {
164         return migrate_data;
165     }
166 
setMigrateData(boolean migrate_data)167     public void setMigrateData(boolean migrate_data) {
168         this.migrate_data=migrate_data;
169     }
170 
getHashFunction()171     public HashFunction getHashFunction() {
172         return hash_function;
173     }
174 
setHashFunction(HashFunction<K> hash_function)175     public void setHashFunction(HashFunction<K> hash_function) {
176         this.hash_function=hash_function;
177     }
178 
addMembershipListener(MembershipListener l)179     public void addMembershipListener(MembershipListener l) {
180         membership_listeners.add(l);
181     }
182 
removeMembershipListener(MembershipListener l)183     public void removeMembershipListener(MembershipListener l) {
184         membership_listeners.remove(l);
185     }
186 
getL1Cache()187     public Cache<K,V> getL1Cache() {
188         return l1_cache;
189     }
190 
setL1Cache(Cache<K,V> cache)191     public void setL1Cache(Cache<K,V> cache) {
192         if(l1_cache != null)
193             l1_cache.stop();
194         l1_cache=cache;
195     }
196 
getL2Cache()197     public Cache<K,V> getL2Cache() {
198         return l2_cache;
199     }
200 
setL2Cache(Cache<K,V> cache)201     public void setL2Cache(Cache<K,V> cache) {
202         if(l2_cache != null)
203             l2_cache.stop();
204         l2_cache=cache;
205     }
206 
207 
208     @ManagedOperation
start()209     public void start() throws Exception {
210         hash_function=new ConsistentHashFunction<K>();
211         addMembershipListener((MembershipListener)hash_function);
212         ch=new JChannel(props);
213         disp=new RpcDispatcher(ch, null, this, this);
214         RpcDispatcher.Marshaller marshaller=new CustomMarshaller();
215         disp.setRequestMarshaller(marshaller);
216         disp.setResponseMarshaller(marshaller);
217         disp.setMethodLookup(new MethodLookup() {
218             public Method findMethod(short id) {
219                 return methods.get(id);
220             }
221         });
222 
223         ch.connect(cluster_name);
224         local_addr=ch.getAddress();
225         view=ch.getView();
226     }
227 
228     @ManagedOperation
stop()229     public void stop() {
230         if(l1_cache != null)
231             l1_cache.stop();
232         if(migrate_data) {
233             List<Address> members_without_me=new ArrayList<Address>(view.getMembers());
234             members_without_me.remove(local_addr);
235 
236             for(Map.Entry<K,Cache.Value<V>> entry: l2_cache.entrySet()) {
237                 K key=entry.getKey();
238                 Address node=hash_function.hash(key, members_without_me);
239                 if(!node.equals(local_addr)) {
240                     Cache.Value<V> val=entry.getValue();
241                     sendPut(node, key, val.getValue(), val.getTimeout(), true);
242                     if(log.isTraceEnabled())
243                         log.trace("migrated " + key + " from " + local_addr + " to " + node);
244                 }
245             }
246         }
247         l2_cache.stop();
248         disp.stop();
249         ch.close();
250     }
251 
252     @ManagedOperation
put(K key, V val)253     public void put(K key, V val) {
254         put(key, val, caching_time);
255     }
256 
257     /**
258      * Adds a key/value to the cache, replacing a previous item if there was one
259      * @param key The key
260      * @param val The value
261      * @param caching_time Time to live. -1 means never cache, 0 means cache forever. All other (positive) values
262      * are the number of milliseconds to cache the item
263      */
264     @ManagedOperation
put(K key, V val, long caching_time)265     public void put(K key, V val, long caching_time) {
266         Address dest_node=getNode(key);
267         if(dest_node.equals(local_addr)) {
268             l2_cache.put(key, val, caching_time);
269         }
270         else {
271             sendPut(dest_node, key, val, caching_time, false);
272         }
273         if(l1_cache != null && caching_time >= 0)
274             l1_cache.put(key, val, caching_time);
275     }
276 
277     @ManagedOperation
get(K key)278     public V get(K key) {
279         if(l1_cache != null) {
280             V val=l1_cache.get(key);
281             if(val != null) {
282                 if(log.isTraceEnabled())
283                     log.trace("returned value " + val + " for " + key + " from L1 cache");
284                 return val;
285             }
286         }
287 
288         Cache.Value<V> val;
289         try {
290             Address dest_node=getNode(key);
291             // if we are the destination, don't invoke an RPC but return the item from our L2 cache directly !
292             if(dest_node.equals(local_addr)) {
293                 val=l2_cache.getEntry(key);
294             }
295             else {
296                 val=(Cache.Value<V>)disp.callRemoteMethod(dest_node,
297                                                           new MethodCall(GET, new Object[]{key}),
298                                                           GroupRequest.GET_FIRST,
299                                                           call_timeout);
300             }
301             if(val != null) {
302                 V retval=val.getValue();
303                 if(l1_cache != null && val.getTimeout() >= 0)
304                     l1_cache.put(key, retval, val.getTimeout());
305                 return retval;
306             }
307             return null;
308         }
309         catch(Throwable t) {
310             if(log.isWarnEnabled())
311                 log.warn("_get() failed", t);
312             return null;
313         }
314     }
315 
316     @ManagedOperation
remove(K key)317     public void remove(K key) {
318         Address dest_node=getNode(key);
319 
320         try {
321             if(dest_node.equals(local_addr)) {
322                 l2_cache.remove(key);
323             }
324             else {
325                 disp.callRemoteMethod(dest_node, new MethodCall(REMOVE, new Object[]{key}),
326                                       GroupRequest.GET_NONE, call_timeout);
327             }
328             if(l1_cache != null)
329                 l1_cache.remove(key);
330         }
331         catch(Throwable t) {
332             if(log.isWarnEnabled())
333                 log.warn("_remove() failed", t);
334         }
335     }
336 
337 
_put(K key, V val, long caching_time)338     public V _put(K key, V val, long caching_time) {
339         if(log.isTraceEnabled())
340             log.trace("_put(" + key + ", " + val + ", " + caching_time + ")");
341         return l2_cache.put(key, val, caching_time);
342     }
343 
_get(K key)344     public Cache.Value<V> _get(K key) {
345         if(log.isTraceEnabled())
346             log.trace("_get(" + key + ")");
347         return l2_cache.getEntry(key);
348     }
349 
_remove(K key)350     public V _remove(K key) {
351         if(log.isTraceEnabled())
352             log.trace("_remove(" + key + ")");
353         return l2_cache.remove(key);
354     }
355 
356 
357 
358 
359 
viewAccepted(View new_view)360     public void viewAccepted(View new_view) {
361         System.out.println("view = " + new_view);
362         this.view=new_view;
363         for(MembershipListener l: membership_listeners) {
364             l.viewAccepted(new_view);
365         }
366 
367         if(migrate_data) {
368             migrateData();
369         }
370     }
371 
suspect(Address suspected_mbr)372     public void suspect(Address suspected_mbr) {
373     }
374 
block()375     public void block() {
376     }
377 
378 
toString()379     public String toString() {
380         StringBuilder sb=new StringBuilder();
381         if(l1_cache != null)
382             sb.append("L1 cache: " + l1_cache.getSize() + " entries");
383         sb.append("\nL2 cache: " + l2_cache.getSize() + "entries()");
384         return sb.toString();
385     }
386 
387 
388     @ManagedOperation
dump()389     public String dump() {
390         StringBuilder sb=new StringBuilder();
391         if(l1_cache != null) {
392             sb.append("L1 cache:\n").append(l1_cache.dump());
393         }
394         sb.append("\nL2 cache:\n").append(l2_cache.dump());
395 
396         return sb.toString();
397     }
398 
399 
migrateData()400     private void migrateData() {
401         for(Map.Entry<K,Cache.Value<V>> entry: l2_cache.entrySet()) {
402             K key=entry.getKey();
403             Address node=getNode(key);
404             if(!node.equals(local_addr)) {
405                 Cache.Value<V> val=entry.getValue();
406                 put(key, val.getValue(), val.getTimeout());
407                 l2_cache.remove(key);
408                 if(log.isTraceEnabled())
409                     log.trace("migrated " + key + " from " + local_addr + " to " + node);
410             }
411         }
412     }
413 
sendPut(Address dest, K key, V val, long caching_time, boolean synchronous)414     private void sendPut(Address dest, K key, V val, long caching_time, boolean synchronous) {
415         try {
416             int mode=synchronous? GroupRequest.GET_ALL : GroupRequest.GET_NONE;
417             disp.callRemoteMethod(dest, new MethodCall(PUT, new Object[]{key, val, caching_time}), mode, call_timeout);
418         }
419         catch(Throwable t) {
420             if(log.isWarnEnabled())
421                 log.warn("_put() failed", t);
422         }
423     }
424 
getNode(K key)425     private Address getNode(K key) {
426         return hash_function.hash(key, null);
427     }
428 
429 
430     public static class ConsistentHashFunction<K> extends MembershipListenerAdapter implements HashFunction<K> {
431         private SortedMap<Short,Address> nodes=new TreeMap<Short,Address>();
432         private final static int HASH_SPACE=2000; // must be > max number of nodes in a cluster
433 
hash(K key, List<Address> members)434         public Address hash(K key, List<Address> members) {
435             int hash=Math.abs(key.hashCode());
436             int index=hash % HASH_SPACE;
437 
438             if(members != null && !members.isEmpty()) {
439                 SortedMap<Short,Address> tmp=new TreeMap<Short,Address>(nodes);
440                 for(Iterator<Map.Entry<Short,Address>> it=tmp.entrySet().iterator(); it.hasNext();) {
441                     Map.Entry<Short, Address> entry=it.next();
442                     if(!members.contains(entry.getValue())) {
443                         it.remove();
444                     }
445                 }
446                 return findFirst(tmp, index);
447             }
448             return findFirst(nodes, index);
449         }
450 
viewAccepted(View new_view)451         public void viewAccepted(View new_view) {
452             nodes.clear();
453             for(Address node: new_view.getMembers()) {
454                 int hash=Math.abs(node.hashCode()) % HASH_SPACE;
455                 for(int i=hash; i < hash + HASH_SPACE; i++) {
456                     short new_index=(short)(i % HASH_SPACE);
457                     if(!nodes.containsKey(new_index)) {
458                         nodes.put(new_index, node);
459                         break;
460                     }
461                 }
462             }
463 
464             if(log.isTraceEnabled()) {
465                 StringBuilder sb=new StringBuilder("node mappings:\n");
466                 for(Map.Entry<Short,Address> entry: nodes.entrySet()) {
467                     sb.append(entry.getKey() + ": " + entry.getValue()).append("\n");
468                 }
469                 log.trace(sb);
470             }
471         }
472 
473 
findFirst(Map<Short,Address> map, int index)474         private static Address findFirst(Map<Short,Address> map, int index) {
475             Address retval;
476             for(int i=index; i < index + HASH_SPACE; i++) {
477                 short new_index=(short)(i % HASH_SPACE);
478                 retval=map.get(new_index);
479                 if(retval != null)
480                     return retval;
481             }
482             return null;
483         }
484     }
485 
486 
487     /**
488      * Uses arrays to store hash values of addresses, plus addresses.
489      */
490     public static class ArrayBasedConsistentHashFunction<K> extends MembershipListenerAdapter implements HashFunction<K> {
491         Object[] nodes=null;
492         private final static int HASH_SPACE=2000; // must be > max number of nodes in a cluster
493 
hash(K key, List<Address> members)494         public Address hash(K key, List<Address> members) {
495             int hash=Math.abs(key.hashCode());
496             int index=hash % HASH_SPACE;
497 
498             if(members != null && !members.isEmpty()) {
499                 Object[] tmp=new Object[nodes.length];
500                 System.arraycopy(nodes, 0, tmp, 0, nodes.length);
501                 for(int i=0; i < tmp.length; i+=2) {
502                     if(!members.contains(tmp[i+1])) {
503                         tmp[i]=tmp[i+1]=null;
504                     }
505                 }
506                 return findFirst(tmp, index);
507             }
508             return findFirst(nodes, index);
509         }
510 
viewAccepted(View new_view)511         public void viewAccepted(View new_view) {
512             nodes=new Object[new_view.size() * 2];
513             int index=0;
514             for(Address node: new_view.getMembers()) {
515                 int hash=Math.abs(node.hashCode()) % HASH_SPACE;
516                 nodes[index++]=hash;
517                 nodes[index++]=node;
518             }
519 
520             if(log.isTraceEnabled()) {
521                 StringBuilder sb=new StringBuilder("node mappings:\n");
522                 for(int i=0; i < nodes.length; i+=2) {
523                     sb.append(nodes[i] + ": " + nodes[i+1]).append("\n");
524                 }
525                 log.trace(sb);
526             }
527         }
528 
suspect(Address suspected_mbr)529         public void suspect(Address suspected_mbr) {
530         }
531 
block()532         public void block() {
533         }
534 
findFirst(Object[] array, int index)535         private static Address findFirst(Object[] array, int index) {
536             Address retval=null;
537             if(array == null)
538                 return null;
539 
540             for(int i=0; i < array.length; i+=2) {
541                 if(array[i] == null)
542                     continue;
543                 if(array[i+1] != null)
544                     retval=(Address)array[i+1];
545                 if(((Integer)array[i]) >= index)
546                     return (Address)array[i+1];
547             }
548             return retval;
549         }
550     }
551 
552 
553     private static class CustomMarshaller implements RpcDispatcher.Marshaller {
554         static final byte NULL        = 0;
555         static final byte OBJ         = 1;
556         static final byte METHOD_CALL = 2;
557         static final byte VALUE       = 3;
558 
559 
objectToByteBuffer(Object obj)560         public byte[] objectToByteBuffer(Object obj) throws Exception {
561 
562             ByteArrayOutputStream out_stream=new ByteArrayOutputStream(35);
563             DataOutputStream out=new DataOutputStream(out_stream);
564             try {
565                 if(obj == null) {
566                     out_stream.write(NULL);
567                     out_stream.flush();
568                     return out_stream.toByteArray();
569                 }
570                 if(obj instanceof MethodCall) {
571                     out.writeByte(METHOD_CALL);
572                     MethodCall call=(MethodCall)obj;
573                     out.writeShort(call.getId());
574                     Object[] args=call.getArgs();
575                     if(args == null || args.length == 0) {
576                         out.writeShort(0);
577                     }
578                     else {
579                         out.writeShort(args.length);
580                         for(int i=0; i < args.length; i++) {
581                             Util.objectToStream(args[i], out);
582                         }
583                     }
584                 }
585                 else if(obj instanceof Cache.Value) {
586                     Cache.Value value=(Cache.Value)obj;
587                     out.writeByte(VALUE);
588                     out.writeLong(value.getTimeout());
589                     Util.objectToStream(value.getValue(), out);
590                 }
591                 else {
592                     out.writeByte(OBJ);
593                     Util.objectToStream(obj, out);
594                 }
595                 out.flush();
596                 return out_stream.toByteArray();
597             }
598             finally {
599                 Util.close(out);
600             }
601         }
602 
objectFromByteBuffer(byte[] buf)603         public Object objectFromByteBuffer(byte[] buf) throws Exception {
604             if(buf == null)
605                 return null;
606 
607             DataInputStream in=new DataInputStream(new ByteArrayInputStream(buf));
608             byte type=in.readByte();
609             if(type == NULL)
610                 return null;
611             if(type == METHOD_CALL) {
612                 short id=in.readShort();
613                 short length=in.readShort();
614                 Object[] args=length > 0? new Object[length] : null;
615                 if(args != null) {
616                     for(int i=0; i < args.length; i++)
617                         args[i]=Util.objectFromStream(in);
618                 }
619                 return new MethodCall(id, args);
620             }
621             else if(type == VALUE) {
622                 long expiration_time=in.readLong();
623                 Object obj=Util.objectFromStream(in);
624                 return new Cache.Value(obj, expiration_time);
625             }
626             else
627                 return Util.objectFromStream(in);
628         }
629     }
630 }
631