1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16from twisted.internet import defer 17from twisted.python import failure 18from twisted.python import log 19from zope.interface import implementer 20 21from buildbot import config 22from buildbot import interfaces 23from buildbot.changes import changes 24from buildbot.process.properties import Properties 25from buildbot.util.service import ClusteredBuildbotService 26from buildbot.util.state import StateMixin 27 28 29@implementer(interfaces.IScheduler) 30class BaseScheduler(ClusteredBuildbotService, StateMixin): 31 32 DEFAULT_CODEBASES = {'': {}} 33 34 compare_attrs = ClusteredBuildbotService.compare_attrs + \ 35 ('builderNames', 'properties', 'codebases') 36 37 def __init__(self, name, builderNames, properties=None, 38 codebases=DEFAULT_CODEBASES): 39 super().__init__(name=name) 40 41 ok = True 42 if interfaces.IRenderable.providedBy(builderNames): 43 pass 44 elif isinstance(builderNames, (list, tuple)): 45 for b in builderNames: 46 if not isinstance(b, str) and \ 47 not interfaces.IRenderable.providedBy(b): 48 ok = False 49 else: 50 ok = False 51 if not ok: 52 config.error( 53 "The builderNames argument to a scheduler must be a list " 54 "of Builder names or an IRenderable object that will render" 55 "to a list of builder names.") 56 57 self.builderNames = builderNames 58 59 if properties is None: 60 properties = {} 61 self.properties = Properties() 62 self.properties.update(properties, "Scheduler") 63 self.properties.setProperty("scheduler", name, "Scheduler") 64 self.objectid = None 65 66 # Set the codebases that are necessary to process the changes 67 # These codebases will always result in a sourcestamp with or without 68 # changes 69 known_keys = set(['branch', 'repository', 'revision']) 70 if codebases is None: 71 config.error("Codebases cannot be None") 72 elif isinstance(codebases, list): 73 codebases = dict((codebase, {}) for codebase in codebases) 74 elif not isinstance(codebases, dict): 75 config.error( 76 "Codebases must be a dict of dicts, or list of strings") 77 else: 78 for codebase, attrs in codebases.items(): 79 if not isinstance(attrs, dict): 80 config.error("Codebases must be a dict of dicts") 81 else: 82 unk = set(attrs) - known_keys 83 if unk: 84 config.error("Unknown codebase keys {} for codebase {}".format( 85 ', '.join(unk), codebase)) 86 87 self.codebases = codebases 88 89 # internal variables 90 self._change_consumer = None 91 self._enable_consumer = None 92 self._change_consumption_lock = defer.DeferredLock() 93 94 self.enabled = True 95 96 def reconfigService(self, *args, **kwargs): 97 raise NotImplementedError() 98 99 # activity handling 100 @defer.inlineCallbacks 101 def activate(self): 102 if not self.enabled: 103 return None 104 105 # even if we aren't called via _activityPoll(), at this point we 106 # need to ensure the service id is set correctly 107 if self.serviceid is None: 108 self.serviceid = yield self._getServiceId() 109 assert self.serviceid is not None 110 111 schedulerData = yield self._getScheduler(self.serviceid) 112 113 if schedulerData: 114 self.enabled = schedulerData['enabled'] 115 116 if not self._enable_consumer: 117 yield self.startConsumingEnableEvents() 118 return None 119 120 def _enabledCallback(self, key, msg): 121 if msg['enabled']: 122 self.enabled = True 123 d = self.activate() 124 else: 125 d = self.deactivate() 126 127 def fn(x): 128 self.enabled = False 129 130 d.addCallback(fn) 131 return d 132 133 @defer.inlineCallbacks 134 def deactivate(self): 135 if not self.enabled: 136 return None 137 yield self._stopConsumingChanges() 138 return None 139 140 # service handling 141 142 def _getServiceId(self): 143 return self.master.data.updates.findSchedulerId(self.name) 144 145 def _getScheduler(self, sid): 146 return self.master.db.schedulers.getScheduler(sid) 147 148 def _claimService(self): 149 return self.master.data.updates.trySetSchedulerMaster(self.serviceid, 150 self.master.masterid) 151 152 def _unclaimService(self): 153 return self.master.data.updates.trySetSchedulerMaster(self.serviceid, 154 None) 155 156 # status queries 157 158 # deprecated: these aren't compatible with distributed schedulers 159 160 def listBuilderNames(self): 161 return self.builderNames 162 163 # change handling 164 165 @defer.inlineCallbacks 166 def startConsumingChanges(self, fileIsImportant=None, change_filter=None, 167 onlyImportant=False): 168 assert fileIsImportant is None or callable(fileIsImportant) 169 170 # register for changes with the data API 171 assert not self._change_consumer 172 self._change_consumer = yield self.master.mq.startConsuming( 173 lambda k, m: self._changeCallback(k, m, fileIsImportant, 174 change_filter, onlyImportant), 175 ('changes', None, 'new')) 176 177 @defer.inlineCallbacks 178 def startConsumingEnableEvents(self): 179 assert not self._enable_consumer 180 self._enable_consumer = yield self.master.mq.startConsuming( 181 self._enabledCallback, 182 ('schedulers', str(self.serviceid), 'updated')) 183 184 @defer.inlineCallbacks 185 def _changeCallback(self, key, msg, fileIsImportant, change_filter, 186 onlyImportant): 187 188 # ignore changes delivered while we're not running 189 if not self._change_consumer: 190 return 191 192 # get a change object, since the API requires it 193 chdict = yield self.master.db.changes.getChange(msg['changeid']) 194 change = yield changes.Change.fromChdict(self.master, chdict) 195 196 # filter it 197 if change_filter and not change_filter.filter_change(change): 198 return 199 if change.codebase not in self.codebases: 200 log.msg(format='change contains codebase %(codebase)s that is ' 201 'not processed by scheduler %(name)s', 202 codebase=change.codebase, name=self.name) 203 return 204 205 if fileIsImportant: 206 try: 207 important = fileIsImportant(change) 208 if not important and onlyImportant: 209 return 210 except Exception: 211 log.err(failure.Failure(), 'in fileIsImportant check for {}'.format(change)) 212 return 213 else: 214 important = True 215 216 # use change_consumption_lock to ensure the service does not stop 217 # while this change is being processed 218 d = self._change_consumption_lock.run( 219 self.gotChange, change, important) 220 d.addErrback(log.err, 'while processing change') 221 222 def _stopConsumingChanges(self): 223 # (note: called automatically in deactivate) 224 225 # acquire the lock change consumption lock to ensure that any change 226 # consumption is complete before we are done stopping consumption 227 def stop(): 228 if self._change_consumer: 229 self._change_consumer.stopConsuming() 230 self._change_consumer = None 231 return self._change_consumption_lock.run(stop) 232 233 def gotChange(self, change, important): 234 raise NotImplementedError 235 236 # starting builds 237 238 @defer.inlineCallbacks 239 def addBuildsetForSourceStampsWithDefaults(self, reason, sourcestamps=None, 240 waited_for=False, properties=None, builderNames=None, 241 **kw): 242 if sourcestamps is None: 243 sourcestamps = [] 244 245 # convert sourcestamps to a dictionary keyed by codebase 246 stampsByCodebase = {} 247 for ss in sourcestamps: 248 cb = ss['codebase'] 249 if cb in stampsByCodebase: 250 raise RuntimeError("multiple sourcestamps with same codebase") 251 stampsByCodebase[cb] = ss 252 253 # Merge codebases with the passed list of sourcestamps 254 # This results in a new sourcestamp for each codebase 255 stampsWithDefaults = [] 256 for codebase in self.codebases: 257 cb = yield self.getCodebaseDict(codebase) 258 ss = { 259 'codebase': codebase, 260 'repository': cb.get('repository', ''), 261 'branch': cb.get('branch', None), 262 'revision': cb.get('revision', None), 263 'project': '', 264 } 265 # apply info from passed sourcestamps onto the configured default 266 # sourcestamp attributes for this codebase. 267 ss.update(stampsByCodebase.get(codebase, {})) 268 stampsWithDefaults.append(ss) 269 270 # fill in any supplied sourcestamps that aren't for a codebase in the 271 # scheduler's codebase dictionary 272 for codebase in set(stampsByCodebase) - set(self.codebases): 273 cb = stampsByCodebase[codebase] 274 ss = { 275 'codebase': codebase, 276 'repository': cb.get('repository', ''), 277 'branch': cb.get('branch', None), 278 'revision': cb.get('revision', None), 279 'project': '', 280 } 281 stampsWithDefaults.append(ss) 282 283 rv = yield self.addBuildsetForSourceStamps( 284 sourcestamps=stampsWithDefaults, reason=reason, 285 waited_for=waited_for, properties=properties, 286 builderNames=builderNames, **kw) 287 return rv 288 289 def getCodebaseDict(self, codebase): 290 # Hook for subclasses to change codebase parameters when a codebase does 291 # not have a change associated with it. 292 try: 293 return defer.succeed(self.codebases[codebase]) 294 except KeyError: 295 return defer.fail() 296 297 @defer.inlineCallbacks 298 def addBuildsetForChanges(self, waited_for=False, reason='', 299 external_idstring=None, changeids=None, builderNames=None, 300 properties=None, 301 **kw): 302 if changeids is None: 303 changeids = [] 304 changesByCodebase = {} 305 306 def get_last_change_for_codebase(codebase): 307 return max(changesByCodebase[codebase], key=lambda change: change["changeid"]) 308 309 # Changes are retrieved from database and grouped by their codebase 310 for changeid in changeids: 311 chdict = yield self.master.db.changes.getChange(changeid) 312 changesByCodebase.setdefault(chdict["codebase"], []).append(chdict) 313 314 sourcestamps = [] 315 for codebase in sorted(self.codebases): 316 if codebase not in changesByCodebase: 317 # codebase has no changes 318 # create a sourcestamp that has no changes 319 cb = yield self.getCodebaseDict(codebase) 320 321 ss = { 322 'codebase': codebase, 323 'repository': cb.get('repository', ''), 324 'branch': cb.get('branch', None), 325 'revision': cb.get('revision', None), 326 'project': '', 327 } 328 else: 329 lastChange = get_last_change_for_codebase(codebase) 330 ss = lastChange['sourcestampid'] 331 sourcestamps.append(ss) 332 333 # add one buildset, using the calculated sourcestamps 334 bsid, brids = yield self.addBuildsetForSourceStamps( 335 waited_for, sourcestamps=sourcestamps, reason=reason, 336 external_idstring=external_idstring, builderNames=builderNames, 337 properties=properties, **kw) 338 339 return (bsid, brids) 340 341 @defer.inlineCallbacks 342 def addBuildsetForSourceStamps(self, waited_for=False, sourcestamps=None, 343 reason='', external_idstring=None, properties=None, 344 builderNames=None, **kw): 345 if sourcestamps is None: 346 sourcestamps = [] 347 # combine properties 348 if properties: 349 properties.updateFromProperties(self.properties) 350 else: 351 properties = self.properties 352 353 # make a fresh copy that we actually can modify safely 354 properties = Properties.fromDict(properties.asDict()) 355 356 # make extra info available from properties.render() 357 properties.master = self.master 358 properties.sourcestamps = [] 359 properties.changes = [] 360 for ss in sourcestamps: 361 if isinstance(ss, int): 362 # fetch actual sourcestamp and changes from data API 363 properties.sourcestamps.append( 364 (yield self.master.data.get(('sourcestamps', ss)))) 365 properties.changes.extend( 366 (yield self.master.data.get(('sourcestamps', ss, 'changes')))) 367 else: 368 # sourcestamp with no change, see addBuildsetForChanges 369 properties.sourcestamps.append(ss) 370 371 for c in properties.changes: 372 properties.updateFromProperties(Properties.fromDict(c['properties'])) 373 374 # apply the default builderNames 375 if not builderNames: 376 builderNames = self.builderNames 377 378 # dynamically get the builder list to schedule 379 builderNames = yield properties.render(builderNames) 380 381 # Get the builder ids 382 # Note that there is a data.updates.findBuilderId(name) 383 # but that would merely only optimize the single builder case, while 384 # probably the multiple builder case will be severely impacted by the 385 # several db requests needed. 386 builderids = list() 387 for bldr in (yield self.master.data.get(('builders', ))): 388 if bldr['name'] in builderNames: 389 builderids.append(bldr['builderid']) 390 391 # translate properties object into a dict as required by the 392 # addBuildset method 393 properties_dict = yield properties.render(properties.asDict()) 394 395 bsid, brids = yield self.master.data.updates.addBuildset( 396 scheduler=self.name, sourcestamps=sourcestamps, reason=reason, 397 waited_for=waited_for, properties=properties_dict, builderids=builderids, 398 external_idstring=external_idstring, **kw) 399 return (bsid, brids) 400