1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16 17from twisted.internet import defer 18from twisted.python import failure 19from twisted.python import log 20 21from buildbot.util import deferwaiter 22from buildbot.util import service 23 24 25class MQBase(service.AsyncService): 26 name = 'mq-implementation' 27 28 def __init__(self): 29 super().__init__() 30 self._deferwaiter = deferwaiter.DeferWaiter() 31 32 @defer.inlineCallbacks 33 def stopService(self): 34 yield self._deferwaiter.wait() 35 yield super().stopService() 36 37 @defer.inlineCallbacks 38 def waitUntilEvent(self, filter, check_callback): 39 d = defer.Deferred() 40 buildCompleteConsumer = yield self.startConsuming( 41 lambda key, value: d.callback((key, value)), 42 filter) 43 check = yield check_callback() 44 # we only wait if the check callback return true 45 if not check: 46 res = yield d 47 else: 48 res = None 49 yield buildCompleteConsumer.stopConsuming() 50 return res 51 52 def invokeQref(self, qref, routingKey, data): 53 self._deferwaiter.add(qref.invoke(routingKey, data)) 54 55 56class QueueRef: 57 58 __slots__ = ['callback'] 59 60 def __init__(self, callback): 61 self.callback = callback 62 63 def invoke(self, routing_key, data): 64 # Potentially returns a Deferred 65 if not self.callback: 66 return None 67 68 try: 69 x = self.callback(routing_key, data) 70 except Exception: 71 log.err(failure.Failure(), 'while invoking %r' % (self.callback,)) 72 return None 73 if isinstance(x, defer.Deferred): 74 x.addErrback(log.err, 'while invoking %r' % (self.callback,)) 75 return x 76 77 def stopConsuming(self): 78 # This method may return a Deferred. 79 # subclasses should set self.callback to None in this method. 80 raise NotImplementedError 81