1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16 17from twisted.internet import defer 18 19from buildbot import config 20from buildbot import interfaces 21from buildbot import util 22from buildbot.process.results import SUCCESS 23from buildbot.process.results import WARNINGS 24from buildbot.schedulers import base 25 26 27class Dependent(base.BaseScheduler): 28 29 compare_attrs = ('upstream_name',) 30 31 def __init__(self, name, upstream, builderNames, **kwargs): 32 super().__init__(name, builderNames, **kwargs) 33 if not interfaces.IScheduler.providedBy(upstream): 34 config.error( 35 "upstream must be another Scheduler instance") 36 self.upstream_name = upstream.name 37 self._buildset_new_consumer = None 38 self._buildset_complete_consumer = None 39 self._cached_upstream_bsids = None 40 41 # the subscription lock makes sure that we're done inserting a 42 # subscription into the DB before registering that the buildset is 43 # complete. 44 self._subscription_lock = defer.DeferredLock() 45 46 @defer.inlineCallbacks 47 def activate(self): 48 yield super().activate() 49 50 if not self.enabled: 51 return 52 53 self._buildset_new_consumer = yield self.master.mq.startConsuming( 54 self._buildset_new_cb, 55 ('buildsets', None, 'new')) 56 # TODO: refactor to subscribe only to interesting buildsets, and 57 # subscribe to them directly, via the data API 58 self._buildset_complete_consumer = yield self.master.mq.startConsuming( 59 self._buildset_complete_cb, 60 ('buildsets', None, 'complete')) 61 62 # check for any buildsets completed before we started 63 yield self._checkCompletedBuildsets(None, ) 64 65 @defer.inlineCallbacks 66 def deactivate(self): 67 # the base deactivate will unsubscribe from new changes 68 yield super().deactivate() 69 70 if not self.enabled: 71 return 72 73 if self._buildset_new_consumer: 74 self._buildset_new_consumer.stopConsuming() 75 if self._buildset_complete_consumer: 76 self._buildset_complete_consumer.stopConsuming() 77 self._cached_upstream_bsids = None 78 79 @util.deferredLocked('_subscription_lock') 80 def _buildset_new_cb(self, key, msg): 81 # check if this was submitted by our upstream 82 if msg['scheduler'] != self.upstream_name: 83 return None 84 85 # record our interest in this buildset 86 return self._addUpstreamBuildset(msg['bsid']) 87 88 def _buildset_complete_cb(self, key, msg): 89 return self._checkCompletedBuildsets(msg['bsid']) 90 91 @util.deferredLocked('_subscription_lock') 92 @defer.inlineCallbacks 93 def _checkCompletedBuildsets(self, bsid): 94 subs = yield self._getUpstreamBuildsets() 95 96 sub_bsids = [] 97 for (sub_bsid, sub_ssids, sub_complete, sub_results) in subs: 98 # skip incomplete builds, handling the case where the 'complete' 99 # column has not been updated yet 100 if not sub_complete and sub_bsid != bsid: 101 continue 102 103 # build a dependent build if the status is appropriate. Note that 104 # this uses the sourcestamps from the buildset, not from any of the 105 # builds performed to complete the buildset (since those might 106 # differ from one another) 107 if sub_results in (SUCCESS, WARNINGS): 108 yield self.addBuildsetForSourceStamps( 109 sourcestamps=sub_ssids.copy(), 110 reason='downstream') 111 112 sub_bsids.append(sub_bsid) 113 114 # and regardless of status, remove the subscriptions 115 yield self._removeUpstreamBuildsets(sub_bsids) 116 117 @defer.inlineCallbacks 118 def _updateCachedUpstreamBuilds(self): 119 if self._cached_upstream_bsids is None: 120 bsids = yield self.master.db.state.getState(self.objectid, 121 'upstream_bsids', []) 122 self._cached_upstream_bsids = bsids 123 124 @defer.inlineCallbacks 125 def _getUpstreamBuildsets(self): 126 # get a list of (bsid, ssids, complete, results) for all 127 # upstream buildsets 128 yield self._updateCachedUpstreamBuilds() 129 130 changed = False 131 rv = [] 132 for bsid in self._cached_upstream_bsids[:]: 133 buildset = yield self.master.data.get(('buildsets', str(bsid))) 134 if not buildset: 135 self._cached_upstream_bsids.remove(bsid) 136 changed = True 137 continue 138 139 ssids = [ss['ssid'] for ss in buildset['sourcestamps']] 140 rv.append((bsid, ssids, buildset['complete'], buildset['results'])) 141 142 if changed: 143 yield self.master.db.state.setState(self.objectid, 144 'upstream_bsids', self._cached_upstream_bsids) 145 146 return rv 147 148 @defer.inlineCallbacks 149 def _addUpstreamBuildset(self, bsid): 150 yield self._updateCachedUpstreamBuilds() 151 152 if bsid not in self._cached_upstream_bsids: 153 self._cached_upstream_bsids.append(bsid) 154 155 yield self.master.db.state.setState(self.objectid, 156 'upstream_bsids', self._cached_upstream_bsids) 157 158 @defer.inlineCallbacks 159 def _removeUpstreamBuildsets(self, bsids): 160 yield self._updateCachedUpstreamBuilds() 161 162 old = set(self._cached_upstream_bsids) 163 self._cached_upstream_bsids = list(old - set(bsids)) 164 165 yield self.master.db.state.setState(self.objectid, 166 'upstream_bsids', self._cached_upstream_bsids) 167