1#
2#  Copyright (C) 2010-2011, 2011 Canonical Ltd. All Rights Reserved
3#
4#  This file was originally taken from txzookeeper and modified later.
5#
6#  Authors:
7#   Kapil Thangavelu and the Kazoo team
8#
9#  txzookeeper is free software: you can redistribute it and/or modify
10#  it under the terms of the GNU Lesser General Public License as published by
11#  the Free Software Foundation, either version 3 of the License, or
12#  (at your option) any later version.
13#
14#  txzookeeper is distributed in the hope that it will be useful,
15#  but WITHOUT ANY WARRANTY; without even the implied warranty of
16#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17#  GNU Lesser General Public License for more details.
18#
19#  You should have received a copy of the GNU Lesser General Public License
20#  along with txzookeeper.  If not, see <http://www.gnu.org/licenses/>.
21
22
23import code
24from collections import namedtuple
25from glob import glob
26from itertools import chain
27import logging
28import os
29import os.path
30import shutil
31import signal
32import subprocess
33import tempfile
34import traceback
35
36
37log = logging.getLogger(__name__)
38
39
40def debug(sig, frame):
41    """Interrupt running process, and provide a python prompt for
42    interactive debugging."""
43    d = {'_frame': frame}         # Allow access to frame object.
44    d.update(frame.f_globals)  # Unless shadowed by global
45    d.update(frame.f_locals)
46
47    i = code.InteractiveConsole(d)
48    message = "Signal recieved : entering python shell.\nTraceback:\n"
49    message += ''.join(traceback.format_stack(frame))
50    i.interact(message)
51
52
53def listen():
54    if os.name != 'nt':  # SIGUSR1 is not supported on Windows
55        signal.signal(signal.SIGUSR1, debug)  # Register handler
56listen()
57
58
59def to_java_compatible_path(path):
60    if os.name == 'nt':
61        path = path.replace('\\', '/')
62    return path
63
64ServerInfo = namedtuple(
65    "ServerInfo",
66    "server_id client_port election_port leader_port admin_port peer_type")
67
68
69class ManagedZooKeeper(object):
70    """Class to manage the running of a ZooKeeper instance for testing.
71
72    Note: no attempt is made to probe the ZooKeeper instance is
73    actually available, or that the selected port is free. In the
74    future, we may want to do that, especially when run in a
75    Hudson/Buildbot context, to ensure more test robustness."""
76
77    def __init__(self, software_path, server_info, peers=(), classpath=None):
78        """Define the ZooKeeper test instance.
79
80        @param install_path: The path to the install for ZK
81        @param port: The port to run the managed ZK instance
82        """
83        self.install_path = software_path
84        self._classpath = classpath
85        self.server_info = server_info
86        self.host = "127.0.0.1"
87        self.peers = peers
88        self.working_path = tempfile.mkdtemp()
89        self._running = False
90
91    def run(self):
92        """Run the ZooKeeper instance under a temporary directory.
93
94        Writes ZK log messages to zookeeper.log in the current directory.
95        """
96        if self.running:
97            return
98        config_path = os.path.join(self.working_path, "zoo.cfg")
99        log_path = os.path.join(self.working_path, "log")
100        log4j_path = os.path.join(self.working_path, "log4j.properties")
101        data_path = os.path.join(self.working_path, "data")
102
103        # various setup steps
104        if not os.path.exists(self.working_path):
105            os.mkdir(self.working_path)
106        if not os.path.exists(log_path):
107            os.mkdir(log_path)
108        if not os.path.exists(data_path):
109            os.mkdir(data_path)
110
111        with open(config_path, "w") as config:
112            config.write("""
113tickTime=2000
114dataDir=%s
115clientPort=%s
116maxClientCnxns=0
117admin.serverPort=%s
118""" % (to_java_compatible_path(data_path),
119       self.server_info.client_port,
120       self.server_info.admin_port))  # NOQA
121
122        # setup a replicated setup if peers are specified
123        if self.peers:
124            servers_cfg = []
125            for p in chain((self.server_info,), self.peers):
126                servers_cfg.append("server.%s=localhost:%s:%s:%s" % (
127                    p.server_id, p.leader_port, p.election_port, p.peer_type))
128
129            with open(config_path, "a") as config:
130                config.write("""
131initLimit=4
132syncLimit=2
133%s
134peerType=%s
135""" % ("\n".join(servers_cfg), self.server_info.peer_type))
136
137        # Write server ids into datadir
138        with open(os.path.join(data_path, "myid"), "w") as myid_file:
139            myid_file.write(str(self.server_info.server_id))
140
141        with open(log4j_path, "w") as log4j:
142            log4j.write("""
143# DEFAULT: console appender only
144log4j.rootLogger=INFO, ROLLINGFILE
145log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
146log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
147log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
148log4j.appender.ROLLINGFILE.Threshold=DEBUG
149log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path(  # NOQA
150                self.working_path + os.sep + "zookeeper.log\n"))
151
152        args = [
153            "java",
154            "-cp", self.classpath,
155
156            # "-Dlog4j.debug",
157            "-Dreadonlymode.enabled=true",
158            "-Dzookeeper.log.dir=%s" % log_path,
159            "-Dzookeeper.root.logger=INFO,CONSOLE",
160            "-Dlog4j.configuration=file:%s" % log4j_path,
161
162            # OS X: Prevent java from appearing in menu bar, process dock
163            # and from activation of the main workspace on run.
164            "-Djava.awt.headless=true",
165
166            "org.apache.zookeeper.server.quorum.QuorumPeerMain",
167            config_path,
168        ]
169        self.process = subprocess.Popen(args=args)
170        log.info("Started zookeeper process %s using args %s",
171                 self.process.pid, args)
172        self._running = True
173
174    @property
175    def classpath(self):
176        """Get the classpath necessary to run ZooKeeper."""
177
178        if self._classpath:
179            return self._classpath
180
181        # Two possibilities, as seen in zkEnv.sh:
182        # Check for a release - top-level zookeeper-*.jar?
183        jars = glob((os.path.join(
184            self.install_path, 'zookeeper-*.jar')))
185        if jars:
186            # Release build (`ant package`)
187            jars.extend(glob(os.path.join(
188                self.install_path,
189                "lib/*.jar")))
190            # support for different file locations on Debian/Ubuntu
191            jars.extend(glob(os.path.join(
192                self.install_path,
193                "log4j-*.jar")))
194            jars.extend(glob(os.path.join(
195                self.install_path,
196                "slf4j-api-*.jar")))
197            jars.extend(glob(os.path.join(
198                self.install_path,
199                "slf4j-log4j-*.jar")))
200        else:
201            # Development build (plain `ant`)
202            jars = glob((os.path.join(
203                self.install_path, 'build/zookeeper-*.jar')))
204            jars.extend(glob(os.path.join(
205                self.install_path,
206                "build/lib/*.jar")))
207
208        return os.pathsep.join(jars)
209
210    @property
211    def address(self):
212        """Get the address of the ZooKeeper instance."""
213        return "%s:%s" % (self.host, self.client_port)
214
215    @property
216    def running(self):
217        return self._running
218
219    @property
220    def client_port(self):
221        return self.server_info.client_port
222
223    def reset(self):
224        """Stop the zookeeper instance, cleaning out its on disk-data."""
225        self.stop()
226        shutil.rmtree(os.path.join(self.working_path, "data"), True)
227        os.mkdir(os.path.join(self.working_path, "data"))
228        with open(os.path.join(self.working_path, "data", "myid"), "w") as fh:
229            fh.write(str(self.server_info.server_id))
230
231    def stop(self):
232        """Stop the Zookeeper instance, retaining on disk state."""
233        if not self.running:
234            return
235        self.process.terminate()
236        self.process.wait()
237        if self.process.returncode != 0:
238            log.warn("Zookeeper process %s failed to terminate with"
239                     " non-zero return code (it terminated with %s return"
240                     " code instead)", self.process.pid,
241                     self.process.returncode)
242        self._running = False
243
244    def destroy(self):
245        """Stop the ZooKeeper instance and destroy its on disk-state"""
246        # called by at exit handler, reimport to avoid cleanup race.
247        self.stop()
248
249        shutil.rmtree(self.working_path, True)
250
251
252class ZookeeperCluster(object):
253
254    def __init__(self, install_path=None, classpath=None,
255                 size=3, port_offset=20000, observer_start_id=-1):
256        self._install_path = install_path
257        self._classpath = classpath
258        self._servers = []
259
260        # Calculate ports and peer group
261        port = port_offset
262        peers = []
263
264        for i in range(size):
265            server_id = i + 1
266            if observer_start_id != -1 and server_id >= observer_start_id:
267                peer_type = 'observer'
268            else:
269                peer_type = 'participant'
270            info = ServerInfo(server_id, port, port + 1, port + 2, port + 3,
271                              peer_type)
272            peers.append(info)
273            port += 10
274
275        # Instantiate Managed ZK Servers
276        for i in range(size):
277            server_peers = list(peers)
278            server_info = server_peers.pop(i)
279            self._servers.append(
280                ManagedZooKeeper(
281                    self._install_path, server_info, server_peers,
282                    classpath=self._classpath))
283
284    def __getitem__(self, k):
285        return self._servers[k]
286
287    def __iter__(self):
288        return iter(self._servers)
289
290    def start(self):
291        # Zookeeper client expresses a preference for either lower ports or
292        # lexicographical ordering of hosts, to ensure that all servers have a
293        # chance to startup, start them in reverse order.
294        for server in reversed(list(self)):
295            server.run()
296        # Giving the servers a moment to start, decreases the overall time
297        # required for a client to successfully connect (2s vs. 4s without
298        # the sleep).
299        import time
300        time.sleep(2)
301
302    def stop(self):
303        for server in self:
304            server.stop()
305        self._servers = []
306
307    def terminate(self):
308        for server in self:
309            server.destroy()
310
311    def reset(self):
312        for server in self:
313            server.reset()
314