1# 2# Copyright (c) ZeroC, Inc. All rights reserved. 3# 4 5import sys, os 6from Util import * 7from Component import component 8from IceBoxUtil import * 9 10class IceStorm(ProcessFromBinDir, Server): 11 12 def __init__(self, instanceName="IceStorm", replica=0, nreplicas=0, transient=False, portnum=0, 13 createDb=True, cleanDb=True, *args, **kargs): 14 Server.__init__(self, exe="icebox", ready="IceStorm", mapping=Mapping.getByName("cpp"), *args, **kargs) 15 self.portnum = portnum 16 self.replica = replica 17 self.nreplicas = nreplicas 18 self.transient = transient 19 self.instanceName = instanceName 20 self.createDb = createDb 21 self.cleanDb = cleanDb 22 self.desc = self.instanceName if self.nreplicas == 0 else "{0} replica #{1}".format(self.instanceName, 23 self.replica) 24 25 def setup(self, current): 26 # Create the database directory 27 if self.createDb: 28 self.dbdir = os.path.join(current.testsuite.getPath(), "{0}-{1}.db".format(self.instanceName, self.replica)) 29 if os.path.exists(self.dbdir): 30 shutil.rmtree(self.dbdir) 31 os.mkdir(self.dbdir) 32 33 def teardown(self, current, success): 34 if self.cleanDb: 35 # Remove the database directory tree 36 try: 37 shutil.rmtree(self.dbdir) 38 except: 39 pass 40 41 def getProps(self, current): 42 props = Server.getProps(self, current) 43 44 # Default properties 45 props.update({ 46 'IceBox.Service.IceStorm' : 'IceStormService,' + component.getSoVersion() + ':createIceStorm', 47 'IceBox.PrintServicesReady' : 'IceStorm', 48 'IceBox.InheritProperties' : 1, 49 'IceStorm.InstanceName' : self.instanceName, 50 'Ice.Admin.InstanceName' : 'IceBox', 51 'Ice.Warn.Dispatch' : 0, 52 'Ice.Warn.Connections' : 0, 53 'IceStorm.LMDB.MapSize' : 1, 54 'IceStorm.LMDB.Path' : '{testdir}/{process.instanceName}-{process.replica}.db', 55 }) 56 57 if self.nreplicas > 0: 58 props['IceStorm.NodeId'] = self.replica 59 60 if self.transient: 61 props["IceStorm.Transient"] = 1 62 63 # 64 # Add endpoint properties here as these properties depend on the worker thread running the 65 # the test case for the port number. The port number is computed by the driver based on a 66 # fixed portnum index for each IceStorm endpoint (portnum = 0 for the topic manager endpoint, 67 # portnum=1 for the publish endpoint, portnum=2 for the node endpoint and portnum=3 for the 68 # icebox admin endpoint). 69 # 70 71 # Manager, publish, node and admin endpoints for given replica number 72 manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 0) 73 publish = lambda replica: "{0}:{1}".format(current.getTestEndpoint(self.portnum + replica * 4 + 1), 74 current.getTestEndpoint(self.portnum + replica * 4 + 1, "udp")) 75 node = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 2) 76 admin = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 3) 77 78 # The endpoints for the given replica 79 props.update({ 80 "IceStorm.TopicManager.Endpoints" : manager(self.replica), 81 "IceStorm.Publish.Endpoints" : publish(self.replica), 82 "Ice.Admin.Endpoints" : admin(self.replica), 83 }) 84 85 # Compute the node and replicated endpoints to be configured for each replica 86 if self.nreplicas > 0: 87 props['IceStorm.Node.Endpoints'] = node(self.replica) 88 for i in range(0, self.nreplicas): 89 props["IceStorm.Nodes.{0}".format(i)] = "{2}/node{0}:{1}".format(i, node(i), self.instanceName) 90 props['IceStorm.ReplicatedTopicManagerEndpoints'] = ":".join([manager(i) for i in range(0, self.nreplicas)]) 91 props['IceStorm.ReplicatedPublishEndpoints'] = ":".join([publish(i) for i in range(0, self.nreplicas)]) 92 93 return props 94 95 def getInstanceName(self): 96 return self.instanceName 97 98 def getTopicManager(self, current): 99 # Return the endpoint for this IceStorm replica 100 return "{1}/TopicManager:{0}".format(current.getTestEndpoint(self.portnum + self.replica * 4), self.instanceName) 101 102 def getReplicatedTopicManager(self, current): 103 # Return the replicated endpoints for IceStorm 104 if self.nreplicas == 0: 105 return self.getTopicManager(current) 106 manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4) 107 return "{1}/TopicManager:{0}".format(":".join([manager(i) for i in range(0, self.nreplicas)]), self.instanceName) 108 109 def shutdown(self, current): 110 # Shutdown this replica by connecting to the IceBox service manager with iceboxadmin 111 endpoint = current.getTestEndpoint(self.portnum + self.replica * 4 + 3) 112 props = { "IceBoxAdmin.ServiceManager.Proxy" : "IceBox/admin -f IceBox.ServiceManager:" + endpoint } 113 IceBoxAdmin().run(current, props=props, args=['shutdown']) 114 115class IceStormProcess: 116 117 def __init__(self, instanceName=None, instance=None): 118 self.instanceName = instanceName 119 self.instance = instance 120 121 def getProps(self, current): 122 123 # 124 # An IceStorm client is provided with the IceStormAdmin.TopicManager.Default property set 125 # to the "instance" topic manager proxy if "instance" is set. Otherwise, if a single it's 126 # set to the replicated topic manager if a specific "instance name" is provided or there's 127 # only one IceStorm instance name deployed. If IceStorm multiple instance names are set, 128 # the client is given an IceStormAdmin.<instance name> property for each instance containing 129 # the replicated topic manager proxy. 130 # 131 132 props = self.getParentProps(current) 133 testcase = current.testcase 134 while testcase and not isinstance(testcase, IceStormTestCase): testcase = testcase.parent 135 if self.instance: 136 props["IceStormAdmin.TopicManager.Default"] = self.instance.getTopicManager(current) 137 else: 138 instanceNames = [self.instanceName] if self.instanceName else testcase.getInstanceNames() 139 if len(instanceNames) == 1: 140 props["IceStormAdmin.TopicManager.Default"] = testcase.getTopicManager(current, instanceNames[0]) 141 else: 142 for name in instanceNames: 143 props["IceStormAdmin.TopicManager.{0}".format(name)] = testcase.getTopicManager(current, name) 144 return props 145 146class IceStormAdmin(ProcessFromBinDir, ProcessIsReleaseOnly, IceStormProcess, Client): 147 148 def __init__(self, instanceName=None, instance=None, *args, **kargs): 149 Client.__init__(self, exe="icestormadmin", mapping=Mapping.getByName("cpp"), *args, **kargs) 150 IceStormProcess.__init__(self, instanceName, instance) 151 152 getParentProps = Client.getProps # Used by IceStormProcess to get the client properties 153 154class Subscriber(IceStormProcess, Server): 155 156 processType = "subscriber" 157 158 def __init__(self, instanceName=None, instance=None, *args, **kargs): 159 Server.__init__(self, *args, **kargs) 160 IceStormProcess.__init__(self, instanceName, instance) 161 162 getParentProps = Server.getProps # Used by IceStormProcess to get the server properties 163 164class Publisher(IceStormProcess, Client): 165 166 processType = "publisher" 167 168 def __init__(self, instanceName=None, instance=None, *args, **kargs): 169 Client.__init__(self, *args, **kargs) 170 IceStormProcess.__init__(self, instanceName, instance) 171 172 getParentProps = Client.getProps # Used by IceStormProcess to get the client properties 173 174class IceStormTestCase(TestCase): 175 176 def __init__(self, name, icestorm, *args, **kargs): 177 TestCase.__init__(self, name, *args, **kargs) 178 self.icestorm = icestorm if isinstance(icestorm, list) else [icestorm] 179 180 def init(self, mapping, testsuite): 181 TestCase.init(self, mapping, testsuite) 182 183 # 184 # Add icestorm servers at the beginning of the server list, IceStorm needs to be 185 # started first! 186 # 187 self.servers = self.icestorm + self.servers 188 189 def runWithDriver(self, current): 190 current.driver.runClientServerTestCase(current) 191 192 def startIceStorm(self, current): 193 for icestorm in self.icestorm: 194 icestorm.start(current) 195 196 def stopIceStorm(self, current): 197 self.shutdown(current) 198 for icestorm in self.icestorm: 199 icestorm.stop(current, True) 200 201 def restartIceStorm(self, current): 202 self.stopIceStorm(current) 203 self.startIceStorm(current) 204 205 def shutdown(self, current): 206 for icestorm in self.icestorm: 207 icestorm.shutdown(current) 208 209 def runadmin(self, current, cmd, instanceName=None, instance=None, exitstatus=0, quiet=False): 210 admin = IceStormAdmin(instanceName, instance, args=["-e", cmd], quiet=quiet) 211 admin.run(current, exitstatus=exitstatus) 212 return admin.getOutput(current) 213 214 def getTopicManager(self, current, instanceName=None): 215 if not instanceName: 216 # Return the topic manager proxy from the first IceStorm server 217 return self.icestorm[0].getReplicatedTopicManager(current) 218 219 # 220 # Otherwise, search for an IceStorm server with the given instance 221 # name and return its replicated topic manager proxy 222 # 223 for s in self.icestorm: 224 if s.getInstanceName() == instanceName: 225 return s.getReplicatedTopicManager(current) 226 227 def getInstanceNames(self): 228 # Return the different IceStorm instance names deployed with this 229 # test case 230 names = set() 231 for s in self.icestorm: 232 names.add(s.getInstanceName()) 233 return list(names) 234