1"""Kazoo testing harnesses""" 2import logging 3import os 4import uuid 5import unittest 6 7from kazoo import python2atexit as atexit 8from kazoo.client import KazooClient 9from kazoo.exceptions import KazooException 10from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED 11from kazoo.protocol.states import ( 12 KazooState 13) 14from kazoo.testing.common import ZookeeperCluster 15 16 17log = logging.getLogger(__name__) 18 19CLUSTER = None 20 21 22def get_global_cluster(): 23 global CLUSTER 24 if CLUSTER is None: 25 ZK_HOME = os.environ.get("ZOOKEEPER_PATH") 26 ZK_CLASSPATH = os.environ.get("ZOOKEEPER_CLASSPATH") 27 ZK_PORT_OFFSET = int(os.environ.get("ZOOKEEPER_PORT_OFFSET", 20000)) 28 ZK_CLUSTER_SIZE = int(os.environ.get("ZOOKEEPER_CLUSTER_SIZE", 3)) 29 ZK_OBSERVER_START_ID = int( 30 os.environ.get("ZOOKEEPER_OBSERVER_START_ID", -1)) 31 32 assert ZK_HOME or ZK_CLASSPATH, ( 33 "Either ZOOKEEPER_PATH or ZOOKEEPER_CLASSPATH environment " 34 "variable must be defined.\n" 35 "For deb package installations this is /usr/share/java") 36 37 CLUSTER = ZookeeperCluster( 38 install_path=ZK_HOME, 39 classpath=ZK_CLASSPATH, 40 port_offset=ZK_PORT_OFFSET, 41 size=ZK_CLUSTER_SIZE, 42 observer_start_id=ZK_OBSERVER_START_ID 43 ) 44 atexit.register(lambda cluster: cluster.terminate(), CLUSTER) 45 return CLUSTER 46 47 48class KazooTestHarness(unittest.TestCase): 49 """Harness for testing code that uses Kazoo 50 51 This object can be used directly or as a mixin. It supports starting 52 and stopping a complete ZooKeeper cluster locally and provides an 53 API for simulating errors and expiring sessions. 54 55 Example:: 56 57 class MyTestCase(KazooTestHarness): 58 def setUp(self): 59 self.setup_zookeeper() 60 61 # additional test setup 62 63 def tearDown(self): 64 self.teardown_zookeeper() 65 66 def test_something(self): 67 something_that_needs_a_kazoo_client(self.client) 68 69 def test_something_else(self): 70 something_that_needs_zk_servers(self.servers) 71 72 """ 73 74 def __init__(self, *args, **kw): 75 super(KazooTestHarness, self).__init__(*args, **kw) 76 self.client = None 77 self._clients = [] 78 79 @property 80 def cluster(self): 81 return get_global_cluster() 82 83 @property 84 def servers(self): 85 return ",".join([s.address for s in self.cluster]) 86 87 def _get_nonchroot_client(self): 88 c = KazooClient(self.servers) 89 self._clients.append(c) 90 return c 91 92 def _get_client(self, **kwargs): 93 c = KazooClient(self.hosts, **kwargs) 94 self._clients.append(c) 95 return c 96 97 def lose_connection(self, event_factory): 98 """Force client to lose connection with server""" 99 self.__break_connection(_CONNECTION_DROP, KazooState.SUSPENDED, 100 event_factory) 101 102 def expire_session(self, event_factory): 103 """Force ZK to expire a client session""" 104 self.__break_connection(_SESSION_EXPIRED, KazooState.LOST, 105 event_factory) 106 107 def setup_zookeeper(self, **client_options): 108 """Create a ZK cluster and chrooted :class:`KazooClient` 109 110 The cluster will only be created on the first invocation and won't be 111 fully torn down until exit. 112 """ 113 do_start = False 114 for s in self.cluster: 115 if not s.running: 116 do_start = True 117 if do_start: 118 self.cluster.start() 119 namespace = "/kazootests" + uuid.uuid4().hex 120 self.hosts = self.servers + namespace 121 if 'timeout' not in client_options: 122 client_options['timeout'] = 0.8 123 self.client = self._get_client(**client_options) 124 self.client.start() 125 self.client.ensure_path("/") 126 127 def teardown_zookeeper(self): 128 """Reset and cleanup the zookeeper cluster that was started.""" 129 while self._clients: 130 c = self._clients.pop() 131 try: 132 c.stop() 133 except KazooException: 134 log.exception("Failed stopping client %s", c) 135 finally: 136 c.close() 137 self.client = None 138 139 def __break_connection(self, break_event, expected_state, event_factory): 140 """Break ZooKeeper connection using the specified event.""" 141 142 lost = event_factory() 143 safe = event_factory() 144 145 def watch_loss(state): 146 if state == expected_state: 147 lost.set() 148 elif lost.is_set() and state == KazooState.CONNECTED: 149 safe.set() 150 return True 151 152 self.client.add_listener(watch_loss) 153 self.client._call(break_event, None) 154 155 lost.wait(5) 156 if not lost.isSet(): 157 raise Exception("Failed to get notified of broken connection.") 158 159 safe.wait(15) 160 if not safe.isSet(): 161 raise Exception("Failed to see client reconnect.") 162 163 self.client.retry(self.client.get_async, '/') 164 165 166class KazooTestCase(KazooTestHarness): 167 def setUp(self): 168 self.setup_zookeeper() 169 170 def tearDown(self): 171 self.teardown_zookeeper() 172