1# This file is part of . Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Team Members 15 16 17import txaio 18from autobahn.twisted.wamp import ApplicationSession 19from autobahn.twisted.wamp import Service 20from autobahn.wamp.exception import TransportLost 21from twisted.internet import defer 22from twisted.python import failure 23from twisted.python import log 24 25from buildbot.util import bytes2unicode 26from buildbot.util import service 27 28 29class MasterService(ApplicationSession, service.AsyncMultiService): 30 31 """ 32 concatenation of all the wamp services of buildbot 33 """ 34 35 def __init__(self, config): 36 # Cannot use super() here. 37 # We must explicitly call both parent constructors. 38 ApplicationSession.__init__(self) 39 service.AsyncMultiService.__init__(self) 40 self.config = config 41 self.leaving = False 42 self.setServiceParent(config.extra['parent']) 43 44 @defer.inlineCallbacks 45 def onJoin(self, details): 46 log.msg("Wamp connection succeed!") 47 for handler in [self] + self.services: 48 yield self.register(handler) 49 yield self.subscribe(handler) 50 yield self.publish("org.buildbot.{}.connected".format(self.master.masterid)) 51 self.parent.service = self 52 self.parent.serviceDeferred.callback(self) 53 54 @defer.inlineCallbacks 55 def onLeave(self, details): 56 if self.leaving: 57 return 58 59 # XXX We don't handle crossbar reboot, or any other disconnection well. 60 # this is a tricky problem, as we would have to reconnect with exponential backoff 61 # re-subscribe to subscriptions, queue messages until reconnection. 62 # This is quite complicated, and I believe much better handled in autobahn 63 # It is possible that such failure is practically non-existent 64 # so for now, we just crash the master 65 log.msg("Guru meditation! We have been disconnected from wamp server") 66 log.msg( 67 "We don't know how to recover this without restarting the whole system") 68 log.msg(str(details)) 69 yield self.master.stopService() 70 71 def onUserError(self, e, msg): 72 log.err(e, msg) 73 74 75def make(config): 76 if config: 77 return MasterService(config) 78 # if no config given, return a description of this WAMPlet .. 79 return {'label': 'Buildbot master wamplet', 80 'description': 'This contains all the wamp methods provided by a buildbot master'} 81 82 83class WampConnector(service.ReconfigurableServiceMixin, service.AsyncMultiService): 84 serviceClass = Service 85 name = "wamp" 86 87 def __init__(self): 88 super().__init__() 89 self.app = None 90 self.router_url = None 91 self.realm = None 92 self.wamp_debug_level = None 93 self.serviceDeferred = defer.Deferred() 94 self.service = None 95 96 def getService(self): 97 if self.service is not None: 98 return defer.succeed(self.service) 99 d = defer.Deferred() 100 101 @self.serviceDeferred.addCallback 102 def gotService(service): 103 d.callback(service) 104 return service 105 return d 106 107 def stopService(self): 108 if self.service is not None: 109 self.service.leaving = True 110 111 super().stopService() 112 113 @defer.inlineCallbacks 114 def publish(self, topic, data, options=None): 115 service = yield self.getService() 116 try: 117 ret = yield service.publish(topic, data, options=options) 118 except TransportLost: 119 log.err(failure.Failure(), "while publishing event " + topic) 120 return None 121 return ret 122 123 @defer.inlineCallbacks 124 def subscribe(self, callback, topic=None, options=None): 125 service = yield self.getService() 126 ret = yield service.subscribe(callback, topic, options) 127 return ret 128 129 @defer.inlineCallbacks 130 def reconfigServiceWithBuildbotConfig(self, new_config): 131 if new_config.mq.get('type', 'simple') != "wamp": 132 if self.app is not None: 133 raise ValueError("Cannot use different wamp settings when reconfiguring") 134 return 135 136 wamp = new_config.mq 137 log.msg("Starting wamp with config: %r", wamp) 138 router_url = wamp.get('router_url', None) 139 realm = bytes2unicode(wamp.get('realm', 'buildbot')) 140 wamp_debug_level = wamp.get('wamp_debug_level', 'error') 141 142 # MQ router can be reconfigured only once. Changes to configuration are not supported. 143 # We can't switch realm nor the URL as that would leave transactions in inconsistent state. 144 # Implementing reconfiguration just for wamp_debug_level does not seem like a good 145 # investment. 146 if self.app is not None: 147 if self.router_url != router_url or self.realm != realm or \ 148 self.wamp_debug_level != wamp_debug_level: 149 raise ValueError("Cannot use different wamp settings when reconfiguring") 150 return 151 152 if router_url is None: 153 return 154 155 self.router_url = router_url 156 self.realm = realm 157 self.wamp_debug_level = wamp_debug_level 158 159 self.app = self.serviceClass( 160 url=self.router_url, 161 extra=dict(master=self.master, parent=self), 162 realm=realm, 163 make=make 164 ) 165 txaio.set_global_log_level(wamp_debug_level) 166 yield self.app.setServiceParent(self) 167 yield super().reconfigServiceWithBuildbotConfig(new_config) 168