1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16
17from twisted.internet import defer
18
19from buildbot import config
20from buildbot import interfaces
21from buildbot import util
22from buildbot.process.results import SUCCESS
23from buildbot.process.results import WARNINGS
24from buildbot.schedulers import base
25
26
27class Dependent(base.BaseScheduler):
28
29    compare_attrs = ('upstream_name',)
30
31    def __init__(self, name, upstream, builderNames, **kwargs):
32        super().__init__(name, builderNames, **kwargs)
33        if not interfaces.IScheduler.providedBy(upstream):
34            config.error(
35                "upstream must be another Scheduler instance")
36        self.upstream_name = upstream.name
37        self._buildset_new_consumer = None
38        self._buildset_complete_consumer = None
39        self._cached_upstream_bsids = None
40
41        # the subscription lock makes sure that we're done inserting a
42        # subscription into the DB before registering that the buildset is
43        # complete.
44        self._subscription_lock = defer.DeferredLock()
45
46    @defer.inlineCallbacks
47    def activate(self):
48        yield super().activate()
49
50        if not self.enabled:
51            return
52
53        self._buildset_new_consumer = yield self.master.mq.startConsuming(
54            self._buildset_new_cb,
55            ('buildsets', None, 'new'))
56        # TODO: refactor to subscribe only to interesting buildsets, and
57        # subscribe to them directly, via the data API
58        self._buildset_complete_consumer = yield self.master.mq.startConsuming(
59            self._buildset_complete_cb,
60            ('buildsets', None, 'complete'))
61
62        # check for any buildsets completed before we started
63        yield self._checkCompletedBuildsets(None, )
64
65    @defer.inlineCallbacks
66    def deactivate(self):
67        # the base deactivate will unsubscribe from new changes
68        yield super().deactivate()
69
70        if not self.enabled:
71            return
72
73        if self._buildset_new_consumer:
74            self._buildset_new_consumer.stopConsuming()
75        if self._buildset_complete_consumer:
76            self._buildset_complete_consumer.stopConsuming()
77        self._cached_upstream_bsids = None
78
79    @util.deferredLocked('_subscription_lock')
80    def _buildset_new_cb(self, key, msg):
81        # check if this was submitted by our upstream
82        if msg['scheduler'] != self.upstream_name:
83            return None
84
85        # record our interest in this buildset
86        return self._addUpstreamBuildset(msg['bsid'])
87
88    def _buildset_complete_cb(self, key, msg):
89        return self._checkCompletedBuildsets(msg['bsid'])
90
91    @util.deferredLocked('_subscription_lock')
92    @defer.inlineCallbacks
93    def _checkCompletedBuildsets(self, bsid):
94        subs = yield self._getUpstreamBuildsets()
95
96        sub_bsids = []
97        for (sub_bsid, sub_ssids, sub_complete, sub_results) in subs:
98            # skip incomplete builds, handling the case where the 'complete'
99            # column has not been updated yet
100            if not sub_complete and sub_bsid != bsid:
101                continue
102
103            # build a dependent build if the status is appropriate.  Note that
104            # this uses the sourcestamps from the buildset, not from any of the
105            # builds performed to complete the buildset (since those might
106            # differ from one another)
107            if sub_results in (SUCCESS, WARNINGS):
108                yield self.addBuildsetForSourceStamps(
109                    sourcestamps=sub_ssids.copy(),
110                    reason='downstream')
111
112            sub_bsids.append(sub_bsid)
113
114        # and regardless of status, remove the subscriptions
115        yield self._removeUpstreamBuildsets(sub_bsids)
116
117    @defer.inlineCallbacks
118    def _updateCachedUpstreamBuilds(self):
119        if self._cached_upstream_bsids is None:
120            bsids = yield self.master.db.state.getState(self.objectid,
121                                                        'upstream_bsids', [])
122            self._cached_upstream_bsids = bsids
123
124    @defer.inlineCallbacks
125    def _getUpstreamBuildsets(self):
126        # get a list of (bsid, ssids, complete, results) for all
127        # upstream buildsets
128        yield self._updateCachedUpstreamBuilds()
129
130        changed = False
131        rv = []
132        for bsid in self._cached_upstream_bsids[:]:
133            buildset = yield self.master.data.get(('buildsets', str(bsid)))
134            if not buildset:
135                self._cached_upstream_bsids.remove(bsid)
136                changed = True
137                continue
138
139            ssids = [ss['ssid'] for ss in buildset['sourcestamps']]
140            rv.append((bsid, ssids, buildset['complete'], buildset['results']))
141
142        if changed:
143            yield self.master.db.state.setState(self.objectid,
144                                                'upstream_bsids', self._cached_upstream_bsids)
145
146        return rv
147
148    @defer.inlineCallbacks
149    def _addUpstreamBuildset(self, bsid):
150        yield self._updateCachedUpstreamBuilds()
151
152        if bsid not in self._cached_upstream_bsids:
153            self._cached_upstream_bsids.append(bsid)
154
155            yield self.master.db.state.setState(self.objectid,
156                                                'upstream_bsids', self._cached_upstream_bsids)
157
158    @defer.inlineCallbacks
159    def _removeUpstreamBuildsets(self, bsids):
160        yield self._updateCachedUpstreamBuilds()
161
162        old = set(self._cached_upstream_bsids)
163        self._cached_upstream_bsids = list(old - set(bsids))
164
165        yield self.master.db.state.setState(self.objectid,
166                                            'upstream_bsids', self._cached_upstream_bsids)
167