1#
2# Copyright (c) ZeroC, Inc. All rights reserved.
3#
4
5import sys, os
6from Util import *
7from Component import component
8from IceBoxUtil import *
9
10class IceStorm(ProcessFromBinDir, Server):
11
12    def __init__(self, instanceName="IceStorm", replica=0, nreplicas=0, transient=False, portnum=0,
13                 createDb=True, cleanDb=True, *args, **kargs):
14        Server.__init__(self, exe="icebox", ready="IceStorm", mapping=Mapping.getByName("cpp"), *args, **kargs)
15        self.portnum = portnum
16        self.replica = replica
17        self.nreplicas = nreplicas
18        self.transient = transient
19        self.instanceName = instanceName
20        self.createDb = createDb
21        self.cleanDb = cleanDb
22        self.desc = self.instanceName if self.nreplicas == 0 else "{0} replica #{1}".format(self.instanceName,
23                                                                                            self.replica)
24
25    def setup(self, current):
26        # Create the database directory
27        if self.createDb:
28            self.dbdir = os.path.join(current.testsuite.getPath(), "{0}-{1}.db".format(self.instanceName, self.replica))
29            if os.path.exists(self.dbdir):
30                shutil.rmtree(self.dbdir)
31            os.mkdir(self.dbdir)
32
33    def teardown(self, current, success):
34        if self.cleanDb:
35            # Remove the database directory tree
36            try:
37                shutil.rmtree(self.dbdir)
38            except:
39                pass
40
41    def getProps(self, current):
42        props = Server.getProps(self, current)
43
44        # Default properties
45        props.update({
46            'IceBox.Service.IceStorm' : 'IceStormService,' + component.getSoVersion() + ':createIceStorm',
47            'IceBox.PrintServicesReady' : 'IceStorm',
48            'IceBox.InheritProperties' : 1,
49            'IceStorm.InstanceName' : self.instanceName,
50            'Ice.Admin.InstanceName' : 'IceBox',
51            'Ice.Warn.Dispatch' : 0,
52            'Ice.Warn.Connections' : 0,
53            'IceStorm.LMDB.MapSize' : 1,
54            'IceStorm.LMDB.Path' : '{testdir}/{process.instanceName}-{process.replica}.db',
55        })
56
57        if self.nreplicas > 0:
58            props['IceStorm.NodeId'] = self.replica
59
60        if self.transient:
61            props["IceStorm.Transient"] = 1
62
63        #
64        # Add endpoint properties here as these properties depend on the worker thread running the
65        # the test case for the port number. The port number is computed by the driver based on a
66        # fixed portnum index for each IceStorm endpoint (portnum = 0 for the topic manager endpoint,
67        # portnum=1 for the publish endpoint, portnum=2 for the node endpoint and portnum=3 for the
68        # icebox admin endpoint).
69        #
70
71        # Manager, publish, node and admin endpoints for given replica number
72        manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 0)
73        publish = lambda replica: "{0}:{1}".format(current.getTestEndpoint(self.portnum + replica * 4 + 1),
74                                                   current.getTestEndpoint(self.portnum + replica * 4 + 1, "udp"))
75        node = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 2)
76        admin = lambda replica: current.getTestEndpoint(self.portnum + replica * 4 + 3)
77
78        # The endpoints for the given replica
79        props.update({
80            "IceStorm.TopicManager.Endpoints" : manager(self.replica),
81            "IceStorm.Publish.Endpoints" : publish(self.replica),
82            "Ice.Admin.Endpoints" : admin(self.replica),
83        })
84
85        # Compute the node and replicated endpoints to be configured for each replica
86        if self.nreplicas > 0:
87            props['IceStorm.Node.Endpoints'] = node(self.replica)
88            for i in range(0, self.nreplicas):
89                props["IceStorm.Nodes.{0}".format(i)] = "{2}/node{0}:{1}".format(i, node(i), self.instanceName)
90            props['IceStorm.ReplicatedTopicManagerEndpoints'] = ":".join([manager(i) for i in range(0, self.nreplicas)])
91            props['IceStorm.ReplicatedPublishEndpoints'] = ":".join([publish(i) for i in range(0, self.nreplicas)])
92
93        return props
94
95    def getInstanceName(self):
96        return self.instanceName
97
98    def getTopicManager(self, current):
99        # Return the endpoint for this IceStorm replica
100        return "{1}/TopicManager:{0}".format(current.getTestEndpoint(self.portnum + self.replica * 4), self.instanceName)
101
102    def getReplicatedTopicManager(self, current):
103        # Return the replicated endpoints for IceStorm
104        if self.nreplicas == 0:
105            return self.getTopicManager(current)
106        manager = lambda replica: current.getTestEndpoint(self.portnum + replica * 4)
107        return "{1}/TopicManager:{0}".format(":".join([manager(i) for i in range(0, self.nreplicas)]), self.instanceName)
108
109    def shutdown(self, current):
110        # Shutdown this replica by connecting to the IceBox service manager with iceboxadmin
111        endpoint = current.getTestEndpoint(self.portnum + self.replica * 4 + 3)
112        props = { "IceBoxAdmin.ServiceManager.Proxy" : "IceBox/admin -f IceBox.ServiceManager:" + endpoint }
113        IceBoxAdmin().run(current, props=props, args=['shutdown'])
114
115class IceStormProcess:
116
117    def __init__(self, instanceName=None, instance=None):
118        self.instanceName = instanceName
119        self.instance = instance
120
121    def getProps(self, current):
122
123        #
124        # An IceStorm client is provided with the IceStormAdmin.TopicManager.Default property set
125        # to the "instance" topic manager proxy if "instance" is set. Otherwise, if a single it's
126        # set to the replicated topic manager if a specific "instance name" is provided or there's
127        # only one IceStorm instance name deployed. If IceStorm multiple instance names are set,
128        # the client is given an IceStormAdmin.<instance name> property for each instance containing
129        # the replicated topic manager proxy.
130        #
131
132        props = self.getParentProps(current)
133        testcase = current.testcase
134        while testcase and not isinstance(testcase, IceStormTestCase): testcase = testcase.parent
135        if self.instance:
136            props["IceStormAdmin.TopicManager.Default"] = self.instance.getTopicManager(current)
137        else:
138            instanceNames = [self.instanceName] if self.instanceName else testcase.getInstanceNames()
139            if len(instanceNames) == 1:
140                props["IceStormAdmin.TopicManager.Default"] = testcase.getTopicManager(current, instanceNames[0])
141            else:
142                for name in instanceNames:
143                    props["IceStormAdmin.TopicManager.{0}".format(name)] = testcase.getTopicManager(current, name)
144        return props
145
146class IceStormAdmin(ProcessFromBinDir, ProcessIsReleaseOnly, IceStormProcess, Client):
147
148    def __init__(self, instanceName=None, instance=None, *args, **kargs):
149        Client.__init__(self, exe="icestormadmin", mapping=Mapping.getByName("cpp"), *args, **kargs)
150        IceStormProcess.__init__(self, instanceName, instance)
151
152    getParentProps = Client.getProps # Used by IceStormProcess to get the client properties
153
154class Subscriber(IceStormProcess, Server):
155
156    processType = "subscriber"
157
158    def __init__(self, instanceName=None, instance=None, *args, **kargs):
159        Server.__init__(self, *args, **kargs)
160        IceStormProcess.__init__(self, instanceName, instance)
161
162    getParentProps = Server.getProps # Used by IceStormProcess to get the server properties
163
164class Publisher(IceStormProcess, Client):
165
166    processType = "publisher"
167
168    def __init__(self, instanceName=None, instance=None, *args, **kargs):
169        Client.__init__(self, *args, **kargs)
170        IceStormProcess.__init__(self, instanceName, instance)
171
172    getParentProps = Client.getProps # Used by IceStormProcess to get the client properties
173
174class IceStormTestCase(TestCase):
175
176    def __init__(self, name, icestorm, *args, **kargs):
177        TestCase.__init__(self, name, *args, **kargs)
178        self.icestorm = icestorm if isinstance(icestorm, list) else [icestorm]
179
180    def init(self, mapping, testsuite):
181        TestCase.init(self, mapping, testsuite)
182
183        #
184        # Add icestorm servers at the beginning of the server list, IceStorm needs to be
185        # started first!
186        #
187        self.servers = self.icestorm + self.servers
188
189    def runWithDriver(self, current):
190        current.driver.runClientServerTestCase(current)
191
192    def startIceStorm(self, current):
193        for icestorm in self.icestorm:
194            icestorm.start(current)
195
196    def stopIceStorm(self, current):
197        self.shutdown(current)
198        for icestorm in self.icestorm:
199            icestorm.stop(current, True)
200
201    def restartIceStorm(self, current):
202        self.stopIceStorm(current)
203        self.startIceStorm(current)
204
205    def shutdown(self, current):
206        for icestorm in self.icestorm:
207            icestorm.shutdown(current)
208
209    def runadmin(self, current, cmd, instanceName=None, instance=None, exitstatus=0, quiet=False):
210        admin = IceStormAdmin(instanceName, instance, args=["-e", cmd], quiet=quiet)
211        admin.run(current, exitstatus=exitstatus)
212        return admin.getOutput(current)
213
214    def getTopicManager(self, current, instanceName=None):
215        if not instanceName:
216            # Return the topic manager proxy from the first IceStorm server
217            return self.icestorm[0].getReplicatedTopicManager(current)
218
219        #
220        # Otherwise, search for an IceStorm server with the given instance
221        # name and return its replicated topic manager proxy
222        #
223        for s in self.icestorm:
224            if s.getInstanceName() == instanceName:
225                return s.getReplicatedTopicManager(current)
226
227    def getInstanceNames(self):
228        # Return the different IceStorm instance names deployed with this
229        # test case
230        names = set()
231        for s in self.icestorm:
232            names.add(s.getInstanceName())
233        return list(names)
234