1from collections import Counter 2from hashlib import md5 3 4 5class MetaRing: 6 """Implement a tunable consistent hashing ring.""" 7 8 def __init__(self, hash_fn): 9 """Create a new HashRing. 10 11 :param hash_fn: use this callable function to hash keys. 12 """ 13 self._distribution = Counter() 14 self._keys = [] 15 self._nodes = {} 16 self._ring = {} 17 18 if hash_fn and not hasattr(hash_fn, "__call__"): 19 raise TypeError("hash_fn should be a callable function") 20 self._hash_fn = hash_fn or ( 21 lambda key: int(md5(str(key).encode("utf-8")).hexdigest(), 16) 22 ) 23 24 def hashi(self, key): 25 """Returns an integer derived from the md5 hash of the given key.""" 26 return self._hash_fn(key) 27 28 def _create_ring(self, nodes): 29 """Generate a ketama compatible continuum/ring.""" 30 for node_name, node_conf in nodes: 31 for w in range(0, node_conf["vnodes"] * node_conf["weight"]): 32 self._distribution[node_name] += 1 33 self._ring[self.hashi(f"{node_name}-{w}")] = node_name 34 self._keys = sorted(self._ring.keys()) 35 36 def _remove_node(self, node_name): 37 """Remove the given node from the continuum/ring. 38 39 :param node_name: the node name. 40 """ 41 try: 42 node_conf = self._nodes.pop(node_name) 43 except Exception: 44 raise KeyError( 45 "node '{}' not found, available nodes: {}".format( 46 node_name, self._nodes.keys() 47 ) 48 ) 49 else: 50 self._distribution.pop(node_name) 51 for w in range(0, node_conf["vnodes"] * node_conf["weight"]): 52 del self._ring[self.hashi(f"{node_name}-{w}")] 53 self._keys = sorted(self._ring.keys()) 54