1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16
17from twisted.internet import defer
18from twisted.python import failure
19from twisted.python import log
20
21from buildbot.util import deferwaiter
22from buildbot.util import service
23
24
25class MQBase(service.AsyncService):
26    name = 'mq-implementation'
27
28    def __init__(self):
29        super().__init__()
30        self._deferwaiter = deferwaiter.DeferWaiter()
31
32    @defer.inlineCallbacks
33    def stopService(self):
34        yield self._deferwaiter.wait()
35        yield super().stopService()
36
37    @defer.inlineCallbacks
38    def waitUntilEvent(self, filter, check_callback):
39        d = defer.Deferred()
40        buildCompleteConsumer = yield self.startConsuming(
41            lambda key, value: d.callback((key, value)),
42            filter)
43        check = yield check_callback()
44        # we only wait if the check callback return true
45        if not check:
46            res = yield d
47        else:
48            res = None
49        yield buildCompleteConsumer.stopConsuming()
50        return res
51
52    def invokeQref(self, qref, routingKey, data):
53        self._deferwaiter.add(qref.invoke(routingKey, data))
54
55
56class QueueRef:
57
58    __slots__ = ['callback']
59
60    def __init__(self, callback):
61        self.callback = callback
62
63    def invoke(self, routing_key, data):
64        # Potentially returns a Deferred
65        if not self.callback:
66            return None
67
68        try:
69            x = self.callback(routing_key, data)
70        except Exception:
71            log.err(failure.Failure(), 'while invoking %r' % (self.callback,))
72            return None
73        if isinstance(x, defer.Deferred):
74            x.addErrback(log.err, 'while invoking %r' % (self.callback,))
75        return x
76
77    def stopConsuming(self):
78        # This method may return a Deferred.
79        # subclasses should set self.callback to None in this method.
80        raise NotImplementedError
81