1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16
17from twisted.internet import defer
18from twisted.python import log
19
20from buildbot.util import Notifier
21
22
23class SubscriptionPoint:
24
25    def __init__(self, name):
26        self.name = name
27        self.subscriptions = set()
28        self._unfinished_deliveries = []
29        self._unfinished_notifier = Notifier()
30        self._got_exceptions = []
31
32    def __str__(self):
33        return "<SubscriptionPoint '{}'>".format(self.name)
34
35    def subscribe(self, callback):
36        sub = Subscription(self, callback)
37        self.subscriptions.add(sub)
38        return sub
39
40    def deliver(self, *args, **kwargs):
41        self._unfinished_deliveries.append(self)
42        for sub in list(self.subscriptions):
43            try:
44                d = sub.callback(*args, **kwargs)
45                if isinstance(d, defer.Deferred):
46                    self._unfinished_deliveries.append(d)
47                    d.addErrback(self._notify_delivery_exception, sub)
48                    d.addBoth(self._notify_delivery_finished, d)
49
50            except Exception as e:
51                self._notify_delivery_exception(e, sub)
52
53        self._notify_delivery_finished(None, self)
54
55    def waitForDeliveriesToFinish(self):
56        # returns a deferred
57        if not self._unfinished_deliveries:
58            return defer.succeed(None)
59        return self._unfinished_notifier.wait()
60
61    def pop_exceptions(self):
62        exceptions = self._got_exceptions
63        self._got_exceptions = None
64        return exceptions
65
66    def _unsubscribe(self, subscription):
67        self.subscriptions.remove(subscription)
68
69    def _notify_delivery_exception(self, e, sub):
70        log.err(e, 'while invoking callback {} to {}'.format(sub.callback, self))
71        self._got_exceptions.append(e)
72
73    def _notify_delivery_finished(self, _, d):
74        self._unfinished_deliveries.remove(d)
75        if not self._unfinished_deliveries:
76            self._unfinished_notifier.notify(None)
77
78
79class Subscription:
80
81    def __init__(self, subpt, callback):
82        self.subpt = subpt
83        self.callback = callback
84
85    def unsubscribe(self):
86        self.subpt._unsubscribe(self)
87