1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16 17from twisted.internet import defer 18from twisted.python import log 19 20from buildbot.util import Notifier 21 22 23class SubscriptionPoint: 24 25 def __init__(self, name): 26 self.name = name 27 self.subscriptions = set() 28 self._unfinished_deliveries = [] 29 self._unfinished_notifier = Notifier() 30 self._got_exceptions = [] 31 32 def __str__(self): 33 return "<SubscriptionPoint '{}'>".format(self.name) 34 35 def subscribe(self, callback): 36 sub = Subscription(self, callback) 37 self.subscriptions.add(sub) 38 return sub 39 40 def deliver(self, *args, **kwargs): 41 self._unfinished_deliveries.append(self) 42 for sub in list(self.subscriptions): 43 try: 44 d = sub.callback(*args, **kwargs) 45 if isinstance(d, defer.Deferred): 46 self._unfinished_deliveries.append(d) 47 d.addErrback(self._notify_delivery_exception, sub) 48 d.addBoth(self._notify_delivery_finished, d) 49 50 except Exception as e: 51 self._notify_delivery_exception(e, sub) 52 53 self._notify_delivery_finished(None, self) 54 55 def waitForDeliveriesToFinish(self): 56 # returns a deferred 57 if not self._unfinished_deliveries: 58 return defer.succeed(None) 59 return self._unfinished_notifier.wait() 60 61 def pop_exceptions(self): 62 exceptions = self._got_exceptions 63 self._got_exceptions = None 64 return exceptions 65 66 def _unsubscribe(self, subscription): 67 self.subscriptions.remove(subscription) 68 69 def _notify_delivery_exception(self, e, sub): 70 log.err(e, 'while invoking callback {} to {}'.format(sub.callback, self)) 71 self._got_exceptions.append(e) 72 73 def _notify_delivery_finished(self, _, d): 74 self._unfinished_deliveries.remove(d) 75 if not self._unfinished_deliveries: 76 self._unfinished_notifier.notify(None) 77 78 79class Subscription: 80 81 def __init__(self, subpt, callback): 82 self.subpt = subpt 83 self.callback = callback 84 85 def unsubscribe(self): 86 self.subpt._unsubscribe(self) 87