1# 2# Copyright (c) ZeroC, Inc. All rights reserved. 3# 4 5import Ice, Test, threading 6 7def test(b): 8 if not b: 9 raise RuntimeError('test assertion failed') 10 11class TestFacetI(Test.TestFacet): 12 def op(self, current = None): 13 return 14 15class RemoteCommunicatorI(Test.RemoteCommunicator, Ice.PropertiesAdminUpdateCallback): 16 def __init__(self, communicator): 17 self.communicator = communicator 18 self.called = False 19 self.m = threading.Condition() 20 21 def getAdmin(self, current = None): 22 return self.communicator.getAdmin() 23 24 def getChanges(self, current = None): 25 with self.m: 26 # 27 # The client calls PropertiesAdmin::setProperties() and then invokes 28 # this operation. Since setProperties() is implemented using AMD, the 29 # client might receive its reply and then call getChanges() before our 30 # updated() method is called. We block here to ensure that updated() 31 # gets called before we return the most recent set of changes. 32 # 33 while not self.called: 34 self.m.wait() 35 36 self.called = False 37 38 return self.changes 39 40 def shutdown(self, current = None): 41 self.communicator.shutdown() 42 43 def waitForShutdown(self, current = None): 44 # 45 # Note that we are executing in a thread of the *main* communicator, 46 # not the one that is being shut down. 47 # 48 self.communicator.waitForShutdown() 49 50 def destroy(self, current = None): 51 self.communicator.destroy() 52 53 def updated(self, changes): 54 with self.m: 55 self.changes = changes 56 self.called = True 57 self.m.notify() 58 59class RemoteCommunicatorFactoryI(Test.RemoteCommunicatorFactory): 60 61 def createCommunicator(self, props, current = None): 62 # 63 # Prepare the property set using the given properties. 64 # 65 init = Ice.InitializationData() 66 init.properties = Ice.createProperties() 67 for k, v in props.items(): 68 init.properties.setProperty(k, v) 69 70 # 71 # Initialize a new communicator. 72 # 73 communicator = Ice.initialize(init) 74 75 # 76 # Install a custom admin facet. 77 # 78 communicator.addAdminFacet(TestFacetI(), "TestFacet") 79 80 # 81 # The RemoteCommunicator servant also implements PropertiesAdminUpdateCallback. 82 # Set the callback on the admin facet. 83 # 84 servant = RemoteCommunicatorI(communicator) 85 admin = communicator.findAdminFacet("Properties") 86 if admin != None: 87 admin.addUpdateCallback(servant) 88 89 proxy = current.adapter.addWithUUID(servant) 90 return Test.RemoteCommunicatorPrx.uncheckedCast(proxy) 91 92 def shutdown(self, current = None): 93 current.adapter.getCommunicator().shutdown() 94