1# This file is part of .  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright  Team Members
15
16
17import txaio
18from autobahn.twisted.wamp import ApplicationSession
19from autobahn.twisted.wamp import Service
20from autobahn.wamp.exception import TransportLost
21from twisted.internet import defer
22from twisted.python import failure
23from twisted.python import log
24
25from buildbot.util import bytes2unicode
26from buildbot.util import service
27
28
29class MasterService(ApplicationSession, service.AsyncMultiService):
30
31    """
32    concatenation of all the wamp services of buildbot
33    """
34
35    def __init__(self, config):
36        # Cannot use super() here.
37        # We must explicitly call both parent constructors.
38        ApplicationSession.__init__(self)
39        service.AsyncMultiService.__init__(self)
40        self.config = config
41        self.leaving = False
42        self.setServiceParent(config.extra['parent'])
43
44    @defer.inlineCallbacks
45    def onJoin(self, details):
46        log.msg("Wamp connection succeed!")
47        for handler in [self] + self.services:
48            yield self.register(handler)
49            yield self.subscribe(handler)
50        yield self.publish("org.buildbot.{}.connected".format(self.master.masterid))
51        self.parent.service = self
52        self.parent.serviceDeferred.callback(self)
53
54    @defer.inlineCallbacks
55    def onLeave(self, details):
56        if self.leaving:
57            return
58
59        # XXX We don't handle crossbar reboot, or any other disconnection well.
60        # this is a tricky problem, as we would have to reconnect with exponential backoff
61        # re-subscribe to subscriptions, queue messages until reconnection.
62        # This is quite complicated, and I believe much better handled in autobahn
63        # It is possible that such failure is practically non-existent
64        # so for now, we just crash the master
65        log.msg("Guru meditation! We have been disconnected from wamp server")
66        log.msg(
67            "We don't know how to recover this without restarting the whole system")
68        log.msg(str(details))
69        yield self.master.stopService()
70
71    def onUserError(self, e, msg):
72        log.err(e, msg)
73
74
75def make(config):
76    if config:
77        return MasterService(config)
78    # if no config given, return a description of this WAMPlet ..
79    return {'label': 'Buildbot master wamplet',
80            'description': 'This contains all the wamp methods provided by a buildbot master'}
81
82
83class WampConnector(service.ReconfigurableServiceMixin, service.AsyncMultiService):
84    serviceClass = Service
85    name = "wamp"
86
87    def __init__(self):
88        super().__init__()
89        self.app = None
90        self.router_url = None
91        self.realm = None
92        self.wamp_debug_level = None
93        self.serviceDeferred = defer.Deferred()
94        self.service = None
95
96    def getService(self):
97        if self.service is not None:
98            return defer.succeed(self.service)
99        d = defer.Deferred()
100
101        @self.serviceDeferred.addCallback
102        def gotService(service):
103            d.callback(service)
104            return service
105        return d
106
107    def stopService(self):
108        if self.service is not None:
109            self.service.leaving = True
110
111        super().stopService()
112
113    @defer.inlineCallbacks
114    def publish(self, topic, data, options=None):
115        service = yield self.getService()
116        try:
117            ret = yield service.publish(topic, data, options=options)
118        except TransportLost:
119            log.err(failure.Failure(), "while publishing event " + topic)
120            return None
121        return ret
122
123    @defer.inlineCallbacks
124    def subscribe(self, callback, topic=None, options=None):
125        service = yield self.getService()
126        ret = yield service.subscribe(callback, topic, options)
127        return ret
128
129    @defer.inlineCallbacks
130    def reconfigServiceWithBuildbotConfig(self, new_config):
131        if new_config.mq.get('type', 'simple') != "wamp":
132            if self.app is not None:
133                raise ValueError("Cannot use different wamp settings when reconfiguring")
134            return
135
136        wamp = new_config.mq
137        log.msg("Starting wamp with config: %r", wamp)
138        router_url = wamp.get('router_url', None)
139        realm = bytes2unicode(wamp.get('realm', 'buildbot'))
140        wamp_debug_level = wamp.get('wamp_debug_level', 'error')
141
142        # MQ router can be reconfigured only once. Changes to configuration are not supported.
143        # We can't switch realm nor the URL as that would leave transactions in inconsistent state.
144        # Implementing reconfiguration just for wamp_debug_level does not seem like a good
145        # investment.
146        if self.app is not None:
147            if self.router_url != router_url or self.realm != realm or \
148                    self.wamp_debug_level != wamp_debug_level:
149                raise ValueError("Cannot use different wamp settings when reconfiguring")
150            return
151
152        if router_url is None:
153            return
154
155        self.router_url = router_url
156        self.realm = realm
157        self.wamp_debug_level = wamp_debug_level
158
159        self.app = self.serviceClass(
160            url=self.router_url,
161            extra=dict(master=self.master, parent=self),
162            realm=realm,
163            make=make
164        )
165        txaio.set_global_log_level(wamp_debug_level)
166        yield self.app.setServiceParent(self)
167        yield super().reconfigServiceWithBuildbotConfig(new_config)
168