1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16from twisted.internet import defer
17from twisted.python import failure
18from twisted.python import log
19from zope.interface import implementer
20
21from buildbot import config
22from buildbot import interfaces
23from buildbot.changes import changes
24from buildbot.process.properties import Properties
25from buildbot.util.service import ClusteredBuildbotService
26from buildbot.util.state import StateMixin
27
28
29@implementer(interfaces.IScheduler)
30class BaseScheduler(ClusteredBuildbotService, StateMixin):
31
32    DEFAULT_CODEBASES = {'': {}}
33
34    compare_attrs = ClusteredBuildbotService.compare_attrs + \
35        ('builderNames', 'properties', 'codebases')
36
37    def __init__(self, name, builderNames, properties=None,
38                 codebases=DEFAULT_CODEBASES):
39        super().__init__(name=name)
40
41        ok = True
42        if interfaces.IRenderable.providedBy(builderNames):
43            pass
44        elif isinstance(builderNames, (list, tuple)):
45            for b in builderNames:
46                if not isinstance(b, str) and \
47                        not interfaces.IRenderable.providedBy(b):
48                    ok = False
49        else:
50            ok = False
51        if not ok:
52            config.error(
53                "The builderNames argument to a scheduler must be a list "
54                "of Builder names or an IRenderable object that will render"
55                "to a list of builder names.")
56
57        self.builderNames = builderNames
58
59        if properties is None:
60            properties = {}
61        self.properties = Properties()
62        self.properties.update(properties, "Scheduler")
63        self.properties.setProperty("scheduler", name, "Scheduler")
64        self.objectid = None
65
66        # Set the codebases that are necessary to process the changes
67        # These codebases will always result in a sourcestamp with or without
68        # changes
69        known_keys = set(['branch', 'repository', 'revision'])
70        if codebases is None:
71            config.error("Codebases cannot be None")
72        elif isinstance(codebases, list):
73            codebases = dict((codebase, {}) for codebase in codebases)
74        elif not isinstance(codebases, dict):
75            config.error(
76                "Codebases must be a dict of dicts, or list of strings")
77        else:
78            for codebase, attrs in codebases.items():
79                if not isinstance(attrs, dict):
80                    config.error("Codebases must be a dict of dicts")
81                else:
82                    unk = set(attrs) - known_keys
83                    if unk:
84                        config.error("Unknown codebase keys {} for codebase {}".format(
85                            ', '.join(unk), codebase))
86
87        self.codebases = codebases
88
89        # internal variables
90        self._change_consumer = None
91        self._enable_consumer = None
92        self._change_consumption_lock = defer.DeferredLock()
93
94        self.enabled = True
95
96    def reconfigService(self, *args, **kwargs):
97        raise NotImplementedError()
98
99    # activity handling
100    @defer.inlineCallbacks
101    def activate(self):
102        if not self.enabled:
103            return None
104
105        # even if we aren't called via _activityPoll(), at this point we
106        # need to ensure the service id is set correctly
107        if self.serviceid is None:
108            self.serviceid = yield self._getServiceId()
109            assert self.serviceid is not None
110
111        schedulerData = yield self._getScheduler(self.serviceid)
112
113        if schedulerData:
114            self.enabled = schedulerData['enabled']
115
116        if not self._enable_consumer:
117            yield self.startConsumingEnableEvents()
118        return None
119
120    def _enabledCallback(self, key, msg):
121        if msg['enabled']:
122            self.enabled = True
123            d = self.activate()
124        else:
125            d = self.deactivate()
126
127            def fn(x):
128                self.enabled = False
129
130            d.addCallback(fn)
131        return d
132
133    @defer.inlineCallbacks
134    def deactivate(self):
135        if not self.enabled:
136            return None
137        yield self._stopConsumingChanges()
138        return None
139
140    # service handling
141
142    def _getServiceId(self):
143        return self.master.data.updates.findSchedulerId(self.name)
144
145    def _getScheduler(self, sid):
146        return self.master.db.schedulers.getScheduler(sid)
147
148    def _claimService(self):
149        return self.master.data.updates.trySetSchedulerMaster(self.serviceid,
150                                                              self.master.masterid)
151
152    def _unclaimService(self):
153        return self.master.data.updates.trySetSchedulerMaster(self.serviceid,
154                                                              None)
155
156    # status queries
157
158    # deprecated: these aren't compatible with distributed schedulers
159
160    def listBuilderNames(self):
161        return self.builderNames
162
163    # change handling
164
165    @defer.inlineCallbacks
166    def startConsumingChanges(self, fileIsImportant=None, change_filter=None,
167                              onlyImportant=False):
168        assert fileIsImportant is None or callable(fileIsImportant)
169
170        # register for changes with the data API
171        assert not self._change_consumer
172        self._change_consumer = yield self.master.mq.startConsuming(
173            lambda k, m: self._changeCallback(k, m, fileIsImportant,
174                                              change_filter, onlyImportant),
175            ('changes', None, 'new'))
176
177    @defer.inlineCallbacks
178    def startConsumingEnableEvents(self):
179        assert not self._enable_consumer
180        self._enable_consumer = yield self.master.mq.startConsuming(
181            self._enabledCallback,
182            ('schedulers', str(self.serviceid), 'updated'))
183
184    @defer.inlineCallbacks
185    def _changeCallback(self, key, msg, fileIsImportant, change_filter,
186                        onlyImportant):
187
188        # ignore changes delivered while we're not running
189        if not self._change_consumer:
190            return
191
192        # get a change object, since the API requires it
193        chdict = yield self.master.db.changes.getChange(msg['changeid'])
194        change = yield changes.Change.fromChdict(self.master, chdict)
195
196        # filter it
197        if change_filter and not change_filter.filter_change(change):
198            return
199        if change.codebase not in self.codebases:
200            log.msg(format='change contains codebase %(codebase)s that is '
201                    'not processed by scheduler %(name)s',
202                    codebase=change.codebase, name=self.name)
203            return
204
205        if fileIsImportant:
206            try:
207                important = fileIsImportant(change)
208                if not important and onlyImportant:
209                    return
210            except Exception:
211                log.err(failure.Failure(), 'in fileIsImportant check for {}'.format(change))
212                return
213        else:
214            important = True
215
216        # use change_consumption_lock to ensure the service does not stop
217        # while this change is being processed
218        d = self._change_consumption_lock.run(
219            self.gotChange, change, important)
220        d.addErrback(log.err, 'while processing change')
221
222    def _stopConsumingChanges(self):
223        # (note: called automatically in deactivate)
224
225        # acquire the lock change consumption lock to ensure that any change
226        # consumption is complete before we are done stopping consumption
227        def stop():
228            if self._change_consumer:
229                self._change_consumer.stopConsuming()
230                self._change_consumer = None
231        return self._change_consumption_lock.run(stop)
232
233    def gotChange(self, change, important):
234        raise NotImplementedError
235
236    # starting builds
237
238    @defer.inlineCallbacks
239    def addBuildsetForSourceStampsWithDefaults(self, reason, sourcestamps=None,
240                                               waited_for=False, properties=None, builderNames=None,
241                                               **kw):
242        if sourcestamps is None:
243            sourcestamps = []
244
245        # convert sourcestamps to a dictionary keyed by codebase
246        stampsByCodebase = {}
247        for ss in sourcestamps:
248            cb = ss['codebase']
249            if cb in stampsByCodebase:
250                raise RuntimeError("multiple sourcestamps with same codebase")
251            stampsByCodebase[cb] = ss
252
253        # Merge codebases with the passed list of sourcestamps
254        # This results in a new sourcestamp for each codebase
255        stampsWithDefaults = []
256        for codebase in self.codebases:
257            cb = yield self.getCodebaseDict(codebase)
258            ss = {
259                'codebase': codebase,
260                'repository': cb.get('repository', ''),
261                'branch': cb.get('branch', None),
262                'revision': cb.get('revision', None),
263                'project': '',
264            }
265            # apply info from passed sourcestamps onto the configured default
266            # sourcestamp attributes for this codebase.
267            ss.update(stampsByCodebase.get(codebase, {}))
268            stampsWithDefaults.append(ss)
269
270        # fill in any supplied sourcestamps that aren't for a codebase in the
271        # scheduler's codebase dictionary
272        for codebase in set(stampsByCodebase) - set(self.codebases):
273            cb = stampsByCodebase[codebase]
274            ss = {
275                'codebase': codebase,
276                'repository': cb.get('repository', ''),
277                'branch': cb.get('branch', None),
278                'revision': cb.get('revision', None),
279                'project': '',
280            }
281            stampsWithDefaults.append(ss)
282
283        rv = yield self.addBuildsetForSourceStamps(
284            sourcestamps=stampsWithDefaults, reason=reason,
285            waited_for=waited_for, properties=properties,
286            builderNames=builderNames, **kw)
287        return rv
288
289    def getCodebaseDict(self, codebase):
290        # Hook for subclasses to change codebase parameters when a codebase does
291        # not have a change associated with it.
292        try:
293            return defer.succeed(self.codebases[codebase])
294        except KeyError:
295            return defer.fail()
296
297    @defer.inlineCallbacks
298    def addBuildsetForChanges(self, waited_for=False, reason='',
299                              external_idstring=None, changeids=None, builderNames=None,
300                              properties=None,
301                              **kw):
302        if changeids is None:
303            changeids = []
304        changesByCodebase = {}
305
306        def get_last_change_for_codebase(codebase):
307            return max(changesByCodebase[codebase], key=lambda change: change["changeid"])
308
309        # Changes are retrieved from database and grouped by their codebase
310        for changeid in changeids:
311            chdict = yield self.master.db.changes.getChange(changeid)
312            changesByCodebase.setdefault(chdict["codebase"], []).append(chdict)
313
314        sourcestamps = []
315        for codebase in sorted(self.codebases):
316            if codebase not in changesByCodebase:
317                # codebase has no changes
318                # create a sourcestamp that has no changes
319                cb = yield self.getCodebaseDict(codebase)
320
321                ss = {
322                    'codebase': codebase,
323                    'repository': cb.get('repository', ''),
324                    'branch': cb.get('branch', None),
325                    'revision': cb.get('revision', None),
326                    'project': '',
327                }
328            else:
329                lastChange = get_last_change_for_codebase(codebase)
330                ss = lastChange['sourcestampid']
331            sourcestamps.append(ss)
332
333        # add one buildset, using the calculated sourcestamps
334        bsid, brids = yield self.addBuildsetForSourceStamps(
335            waited_for, sourcestamps=sourcestamps, reason=reason,
336            external_idstring=external_idstring, builderNames=builderNames,
337            properties=properties, **kw)
338
339        return (bsid, brids)
340
341    @defer.inlineCallbacks
342    def addBuildsetForSourceStamps(self, waited_for=False, sourcestamps=None,
343                                   reason='', external_idstring=None, properties=None,
344                                   builderNames=None, **kw):
345        if sourcestamps is None:
346            sourcestamps = []
347        # combine properties
348        if properties:
349            properties.updateFromProperties(self.properties)
350        else:
351            properties = self.properties
352
353        # make a fresh copy that we actually can modify safely
354        properties = Properties.fromDict(properties.asDict())
355
356        # make extra info available from properties.render()
357        properties.master = self.master
358        properties.sourcestamps = []
359        properties.changes = []
360        for ss in sourcestamps:
361            if isinstance(ss, int):
362                # fetch actual sourcestamp and changes from data API
363                properties.sourcestamps.append(
364                    (yield self.master.data.get(('sourcestamps', ss))))
365                properties.changes.extend(
366                    (yield self.master.data.get(('sourcestamps', ss, 'changes'))))
367            else:
368                # sourcestamp with no change, see addBuildsetForChanges
369                properties.sourcestamps.append(ss)
370
371        for c in properties.changes:
372            properties.updateFromProperties(Properties.fromDict(c['properties']))
373
374        # apply the default builderNames
375        if not builderNames:
376            builderNames = self.builderNames
377
378        # dynamically get the builder list to schedule
379        builderNames = yield properties.render(builderNames)
380
381        # Get the builder ids
382        # Note that there is a data.updates.findBuilderId(name)
383        # but that would merely only optimize the single builder case, while
384        # probably the multiple builder case will be severely impacted by the
385        # several db requests needed.
386        builderids = list()
387        for bldr in (yield self.master.data.get(('builders', ))):
388            if bldr['name'] in builderNames:
389                builderids.append(bldr['builderid'])
390
391        # translate properties object into a dict as required by the
392        # addBuildset method
393        properties_dict = yield properties.render(properties.asDict())
394
395        bsid, brids = yield self.master.data.updates.addBuildset(
396            scheduler=self.name, sourcestamps=sourcestamps, reason=reason,
397            waited_for=waited_for, properties=properties_dict, builderids=builderids,
398            external_idstring=external_idstring, **kw)
399        return (bsid, brids)
400