1"""Kazoo testing harnesses"""
2import logging
3import os
4import uuid
5import unittest
6
7from kazoo import python2atexit as atexit
8from kazoo.client import KazooClient
9from kazoo.exceptions import KazooException
10from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
11from kazoo.protocol.states import (
12    KazooState
13)
14from kazoo.testing.common import ZookeeperCluster
15
16
17log = logging.getLogger(__name__)
18
19CLUSTER = None
20
21
22def get_global_cluster():
23    global CLUSTER
24    if CLUSTER is None:
25        ZK_HOME = os.environ.get("ZOOKEEPER_PATH")
26        ZK_CLASSPATH = os.environ.get("ZOOKEEPER_CLASSPATH")
27        ZK_PORT_OFFSET = int(os.environ.get("ZOOKEEPER_PORT_OFFSET", 20000))
28        ZK_CLUSTER_SIZE = int(os.environ.get("ZOOKEEPER_CLUSTER_SIZE", 3))
29        ZK_OBSERVER_START_ID = int(
30            os.environ.get("ZOOKEEPER_OBSERVER_START_ID", -1))
31
32        assert ZK_HOME or ZK_CLASSPATH, (
33            "Either ZOOKEEPER_PATH or ZOOKEEPER_CLASSPATH environment "
34            "variable must be defined.\n"
35            "For deb package installations this is /usr/share/java")
36
37        CLUSTER = ZookeeperCluster(
38            install_path=ZK_HOME,
39            classpath=ZK_CLASSPATH,
40            port_offset=ZK_PORT_OFFSET,
41            size=ZK_CLUSTER_SIZE,
42            observer_start_id=ZK_OBSERVER_START_ID
43        )
44        atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
45    return CLUSTER
46
47
48class KazooTestHarness(unittest.TestCase):
49    """Harness for testing code that uses Kazoo
50
51    This object can be used directly or as a mixin. It supports starting
52    and stopping a complete ZooKeeper cluster locally and provides an
53    API for simulating errors and expiring sessions.
54
55    Example::
56
57        class MyTestCase(KazooTestHarness):
58            def setUp(self):
59                self.setup_zookeeper()
60
61                # additional test setup
62
63            def tearDown(self):
64                self.teardown_zookeeper()
65
66            def test_something(self):
67                something_that_needs_a_kazoo_client(self.client)
68
69            def test_something_else(self):
70                something_that_needs_zk_servers(self.servers)
71
72    """
73
74    def __init__(self, *args, **kw):
75        super(KazooTestHarness, self).__init__(*args, **kw)
76        self.client = None
77        self._clients = []
78
79    @property
80    def cluster(self):
81        return get_global_cluster()
82
83    @property
84    def servers(self):
85        return ",".join([s.address for s in self.cluster])
86
87    def _get_nonchroot_client(self):
88        c = KazooClient(self.servers)
89        self._clients.append(c)
90        return c
91
92    def _get_client(self, **kwargs):
93        c = KazooClient(self.hosts, **kwargs)
94        self._clients.append(c)
95        return c
96
97    def lose_connection(self, event_factory):
98        """Force client to lose connection with server"""
99        self.__break_connection(_CONNECTION_DROP, KazooState.SUSPENDED,
100                                event_factory)
101
102    def expire_session(self, event_factory):
103        """Force ZK to expire a client session"""
104        self.__break_connection(_SESSION_EXPIRED, KazooState.LOST,
105                                event_factory)
106
107    def setup_zookeeper(self, **client_options):
108        """Create a ZK cluster and chrooted :class:`KazooClient`
109
110        The cluster will only be created on the first invocation and won't be
111        fully torn down until exit.
112        """
113        do_start = False
114        for s in self.cluster:
115            if not s.running:
116                do_start = True
117        if do_start:
118            self.cluster.start()
119        namespace = "/kazootests" + uuid.uuid4().hex
120        self.hosts = self.servers + namespace
121        if 'timeout' not in client_options:
122            client_options['timeout'] = 0.8
123        self.client = self._get_client(**client_options)
124        self.client.start()
125        self.client.ensure_path("/")
126
127    def teardown_zookeeper(self):
128        """Reset and cleanup the zookeeper cluster that was started."""
129        while self._clients:
130            c = self._clients.pop()
131            try:
132                c.stop()
133            except KazooException:
134                log.exception("Failed stopping client %s", c)
135            finally:
136                c.close()
137        self.client = None
138
139    def __break_connection(self, break_event, expected_state, event_factory):
140        """Break ZooKeeper connection using the specified event."""
141
142        lost = event_factory()
143        safe = event_factory()
144
145        def watch_loss(state):
146            if state == expected_state:
147                lost.set()
148            elif lost.is_set() and state == KazooState.CONNECTED:
149                safe.set()
150                return True
151
152        self.client.add_listener(watch_loss)
153        self.client._call(break_event, None)
154
155        lost.wait(5)
156        if not lost.isSet():
157            raise Exception("Failed to get notified of broken connection.")
158
159        safe.wait(15)
160        if not safe.isSet():
161            raise Exception("Failed to see client reconnect.")
162
163        self.client.retry(self.client.get_async, '/')
164
165
166class KazooTestCase(KazooTestHarness):
167    def setUp(self):
168        self.setup_zookeeper()
169
170    def tearDown(self):
171        self.teardown_zookeeper()
172