1# 2# Copyright (C) 2010-2011, 2011 Canonical Ltd. All Rights Reserved 3# 4# This file was originally taken from txzookeeper and modified later. 5# 6# Authors: 7# Kapil Thangavelu and the Kazoo team 8# 9# txzookeeper is free software: you can redistribute it and/or modify 10# it under the terms of the GNU Lesser General Public License as published by 11# the Free Software Foundation, either version 3 of the License, or 12# (at your option) any later version. 13# 14# txzookeeper is distributed in the hope that it will be useful, 15# but WITHOUT ANY WARRANTY; without even the implied warranty of 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17# GNU Lesser General Public License for more details. 18# 19# You should have received a copy of the GNU Lesser General Public License 20# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. 21 22 23import code 24from collections import namedtuple 25from glob import glob 26from itertools import chain 27import logging 28import os 29import os.path 30import shutil 31import signal 32import subprocess 33import tempfile 34import traceback 35 36 37log = logging.getLogger(__name__) 38 39 40def debug(sig, frame): 41 """Interrupt running process, and provide a python prompt for 42 interactive debugging.""" 43 d = {'_frame': frame} # Allow access to frame object. 44 d.update(frame.f_globals) # Unless shadowed by global 45 d.update(frame.f_locals) 46 47 i = code.InteractiveConsole(d) 48 message = "Signal recieved : entering python shell.\nTraceback:\n" 49 message += ''.join(traceback.format_stack(frame)) 50 i.interact(message) 51 52 53def listen(): 54 if os.name != 'nt': # SIGUSR1 is not supported on Windows 55 signal.signal(signal.SIGUSR1, debug) # Register handler 56listen() 57 58 59def to_java_compatible_path(path): 60 if os.name == 'nt': 61 path = path.replace('\\', '/') 62 return path 63 64ServerInfo = namedtuple( 65 "ServerInfo", 66 "server_id client_port election_port leader_port admin_port peer_type") 67 68 69class ManagedZooKeeper(object): 70 """Class to manage the running of a ZooKeeper instance for testing. 71 72 Note: no attempt is made to probe the ZooKeeper instance is 73 actually available, or that the selected port is free. In the 74 future, we may want to do that, especially when run in a 75 Hudson/Buildbot context, to ensure more test robustness.""" 76 77 def __init__(self, software_path, server_info, peers=(), classpath=None): 78 """Define the ZooKeeper test instance. 79 80 @param install_path: The path to the install for ZK 81 @param port: The port to run the managed ZK instance 82 """ 83 self.install_path = software_path 84 self._classpath = classpath 85 self.server_info = server_info 86 self.host = "127.0.0.1" 87 self.peers = peers 88 self.working_path = tempfile.mkdtemp() 89 self._running = False 90 91 def run(self): 92 """Run the ZooKeeper instance under a temporary directory. 93 94 Writes ZK log messages to zookeeper.log in the current directory. 95 """ 96 if self.running: 97 return 98 config_path = os.path.join(self.working_path, "zoo.cfg") 99 log_path = os.path.join(self.working_path, "log") 100 log4j_path = os.path.join(self.working_path, "log4j.properties") 101 data_path = os.path.join(self.working_path, "data") 102 103 # various setup steps 104 if not os.path.exists(self.working_path): 105 os.mkdir(self.working_path) 106 if not os.path.exists(log_path): 107 os.mkdir(log_path) 108 if not os.path.exists(data_path): 109 os.mkdir(data_path) 110 111 with open(config_path, "w") as config: 112 config.write(""" 113tickTime=2000 114dataDir=%s 115clientPort=%s 116maxClientCnxns=0 117admin.serverPort=%s 118""" % (to_java_compatible_path(data_path), 119 self.server_info.client_port, 120 self.server_info.admin_port)) # NOQA 121 122 # setup a replicated setup if peers are specified 123 if self.peers: 124 servers_cfg = [] 125 for p in chain((self.server_info,), self.peers): 126 servers_cfg.append("server.%s=localhost:%s:%s:%s" % ( 127 p.server_id, p.leader_port, p.election_port, p.peer_type)) 128 129 with open(config_path, "a") as config: 130 config.write(""" 131initLimit=4 132syncLimit=2 133%s 134peerType=%s 135""" % ("\n".join(servers_cfg), self.server_info.peer_type)) 136 137 # Write server ids into datadir 138 with open(os.path.join(data_path, "myid"), "w") as myid_file: 139 myid_file.write(str(self.server_info.server_id)) 140 141 with open(log4j_path, "w") as log4j: 142 log4j.write(""" 143# DEFAULT: console appender only 144log4j.rootLogger=INFO, ROLLINGFILE 145log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout 146log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n 147log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender 148log4j.appender.ROLLINGFILE.Threshold=DEBUG 149log4j.appender.ROLLINGFILE.File=""" + to_java_compatible_path( # NOQA 150 self.working_path + os.sep + "zookeeper.log\n")) 151 152 args = [ 153 "java", 154 "-cp", self.classpath, 155 156 # "-Dlog4j.debug", 157 "-Dreadonlymode.enabled=true", 158 "-Dzookeeper.log.dir=%s" % log_path, 159 "-Dzookeeper.root.logger=INFO,CONSOLE", 160 "-Dlog4j.configuration=file:%s" % log4j_path, 161 162 # OS X: Prevent java from appearing in menu bar, process dock 163 # and from activation of the main workspace on run. 164 "-Djava.awt.headless=true", 165 166 "org.apache.zookeeper.server.quorum.QuorumPeerMain", 167 config_path, 168 ] 169 self.process = subprocess.Popen(args=args) 170 log.info("Started zookeeper process %s using args %s", 171 self.process.pid, args) 172 self._running = True 173 174 @property 175 def classpath(self): 176 """Get the classpath necessary to run ZooKeeper.""" 177 178 if self._classpath: 179 return self._classpath 180 181 # Two possibilities, as seen in zkEnv.sh: 182 # Check for a release - top-level zookeeper-*.jar? 183 jars = glob((os.path.join( 184 self.install_path, 'zookeeper-*.jar'))) 185 if jars: 186 # Release build (`ant package`) 187 jars.extend(glob(os.path.join( 188 self.install_path, 189 "lib/*.jar"))) 190 # support for different file locations on Debian/Ubuntu 191 jars.extend(glob(os.path.join( 192 self.install_path, 193 "log4j-*.jar"))) 194 jars.extend(glob(os.path.join( 195 self.install_path, 196 "slf4j-api-*.jar"))) 197 jars.extend(glob(os.path.join( 198 self.install_path, 199 "slf4j-log4j-*.jar"))) 200 else: 201 # Development build (plain `ant`) 202 jars = glob((os.path.join( 203 self.install_path, 'build/zookeeper-*.jar'))) 204 jars.extend(glob(os.path.join( 205 self.install_path, 206 "build/lib/*.jar"))) 207 208 return os.pathsep.join(jars) 209 210 @property 211 def address(self): 212 """Get the address of the ZooKeeper instance.""" 213 return "%s:%s" % (self.host, self.client_port) 214 215 @property 216 def running(self): 217 return self._running 218 219 @property 220 def client_port(self): 221 return self.server_info.client_port 222 223 def reset(self): 224 """Stop the zookeeper instance, cleaning out its on disk-data.""" 225 self.stop() 226 shutil.rmtree(os.path.join(self.working_path, "data"), True) 227 os.mkdir(os.path.join(self.working_path, "data")) 228 with open(os.path.join(self.working_path, "data", "myid"), "w") as fh: 229 fh.write(str(self.server_info.server_id)) 230 231 def stop(self): 232 """Stop the Zookeeper instance, retaining on disk state.""" 233 if not self.running: 234 return 235 self.process.terminate() 236 self.process.wait() 237 if self.process.returncode != 0: 238 log.warn("Zookeeper process %s failed to terminate with" 239 " non-zero return code (it terminated with %s return" 240 " code instead)", self.process.pid, 241 self.process.returncode) 242 self._running = False 243 244 def destroy(self): 245 """Stop the ZooKeeper instance and destroy its on disk-state""" 246 # called by at exit handler, reimport to avoid cleanup race. 247 self.stop() 248 249 shutil.rmtree(self.working_path, True) 250 251 252class ZookeeperCluster(object): 253 254 def __init__(self, install_path=None, classpath=None, 255 size=3, port_offset=20000, observer_start_id=-1): 256 self._install_path = install_path 257 self._classpath = classpath 258 self._servers = [] 259 260 # Calculate ports and peer group 261 port = port_offset 262 peers = [] 263 264 for i in range(size): 265 server_id = i + 1 266 if observer_start_id != -1 and server_id >= observer_start_id: 267 peer_type = 'observer' 268 else: 269 peer_type = 'participant' 270 info = ServerInfo(server_id, port, port + 1, port + 2, port + 3, 271 peer_type) 272 peers.append(info) 273 port += 10 274 275 # Instantiate Managed ZK Servers 276 for i in range(size): 277 server_peers = list(peers) 278 server_info = server_peers.pop(i) 279 self._servers.append( 280 ManagedZooKeeper( 281 self._install_path, server_info, server_peers, 282 classpath=self._classpath)) 283 284 def __getitem__(self, k): 285 return self._servers[k] 286 287 def __iter__(self): 288 return iter(self._servers) 289 290 def start(self): 291 # Zookeeper client expresses a preference for either lower ports or 292 # lexicographical ordering of hosts, to ensure that all servers have a 293 # chance to startup, start them in reverse order. 294 for server in reversed(list(self)): 295 server.run() 296 # Giving the servers a moment to start, decreases the overall time 297 # required for a client to successfully connect (2s vs. 4s without 298 # the sleep). 299 import time 300 time.sleep(2) 301 302 def stop(self): 303 for server in self: 304 server.stop() 305 self._servers = [] 306 307 def terminate(self): 308 for server in self: 309 server.destroy() 310 311 def reset(self): 312 for server in self: 313 server.reset() 314