1# This file is part of Buildbot. Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16import hashlib
17
18from twisted.application import service
19from twisted.internet import defer
20from twisted.internet import task
21from twisted.python import log
22from twisted.python import reflect
23from twisted.python.reflect import accumulateClassList
24
25from buildbot import util
26from buildbot.util import bytes2unicode
27from buildbot.util import config
28from buildbot.util import unicode2bytes
29
30
31class ReconfigurableServiceMixin:
32
33    reconfig_priority = 128
34
35    @defer.inlineCallbacks
36    def reconfigServiceWithBuildbotConfig(self, new_config):
37        if not service.IServiceCollection.providedBy(self):
38            return
39
40        # get a list of child services to reconfigure
41        reconfigurable_services = [svc
42                                   for svc in self
43                                   if isinstance(svc, ReconfigurableServiceMixin)]
44
45        # sort by priority
46        reconfigurable_services.sort(key=lambda svc: -svc.reconfig_priority)
47
48        for svc in reconfigurable_services:
49            yield svc.reconfigServiceWithBuildbotConfig(new_config)
50
51
52# twisted 16's Service is now an new style class, better put everybody new style
53# to catch issues even on twisted < 16
54class AsyncService(service.Service):
55
56    # service.Service.setServiceParent does not wait for neither disownServiceParent nor addService
57    # to complete
58    @defer.inlineCallbacks
59    def setServiceParent(self, parent):
60        if self.parent is not None:
61            yield self.disownServiceParent()
62        parent = service.IServiceCollection(parent, parent)
63        self.parent = parent
64        yield self.parent.addService(self)
65
66    # service.Service.disownServiceParent does not wait for removeService to complete before
67    # setting parent to None
68    @defer.inlineCallbacks
69    def disownServiceParent(self):
70        yield self.parent.removeService(self)
71        self.parent = None
72
73    # We recurse over the parent services until we find a MasterService
74    @property
75    def master(self):
76        if self.parent is None:
77            return None
78        return self.parent.master
79
80
81class AsyncMultiService(AsyncService, service.MultiService):
82
83    def startService(self):
84        # Do NOT use super() here.
85        # The method resolution order would cause MultiService.startService() to
86        # be called which we explicitly want to override with this method.
87        service.Service.startService(self)
88        dl = []
89        # if a service attaches another service during the reconfiguration
90        # then the service will be started twice, so we don't use iter, but rather
91        # copy in a list
92        for svc in list(self):
93            # handle any deferreds, passing up errors and success
94            dl.append(defer.maybeDeferred(svc.startService))
95        return defer.gatherResults(dl, consumeErrors=True)
96
97    @defer.inlineCallbacks
98    def stopService(self):
99        # Do NOT use super() here.
100        # The method resolution order would cause MultiService.stopService() to
101        # be called which we explicitly want to override with this method.
102        service.Service.stopService(self)
103        services = list(self)
104        services.reverse()
105        dl = []
106        for svc in services:
107            if not isinstance(svc, SharedService):
108                dl.append(defer.maybeDeferred(svc.stopService))
109        # unlike MultiService, consume errors in each individual deferred, and
110        # pass the first error in a child service up to our caller
111        yield defer.gatherResults(dl, consumeErrors=True)
112        for svc in services:
113            if isinstance(svc, SharedService):
114                yield svc.stopService()
115
116    def addService(self, service):
117        if service.name is not None:
118            if service.name in self.namedServices:
119                raise RuntimeError(("cannot have two services with same name"
120                                    " '{}'").format(service.name))
121            self.namedServices[service.name] = service
122        self.services.append(service)
123        if self.running:
124            # It may be too late for that, but we will do our best
125            service.privilegedStartService()
126            return service.startService()
127        return defer.succeed(None)
128
129
130class MasterService(AsyncMultiService):
131    # master service is the service that stops the master property recursion
132
133    @property
134    def master(self):
135        return self
136
137
138class SharedService(AsyncMultiService):
139    """a service that is created only once per parameter set in a parent service"""
140
141    @classmethod
142    @defer.inlineCallbacks
143    def getService(cls, parent, *args, **kwargs):
144        name = cls.getName(*args, **kwargs)
145        if name in parent.namedServices:
146            return parent.namedServices[name]
147
148        instance = cls(*args, **kwargs)
149
150        # The class is not required to initialized its name
151        # but we use the name to identify the instance in the parent service
152        # so we force it with the name we used
153        instance.name = name
154        yield instance.setServiceParent(parent)
155
156        # we put the service on top of the list, so that it is stopped the last
157        # This make sense as the shared service is used as a dependency
158        # for other service
159        parent.services.remove(instance)
160        parent.services.insert(0, instance)
161        # hook the return value to the instance object
162        return instance
163
164    @classmethod
165    def getName(cls, *args, **kwargs):
166        _hash = hashlib.sha1()
167        for arg in args:
168            arg = unicode2bytes(str(arg))
169            _hash.update(arg)
170        for k, v in sorted(kwargs.items()):
171            k = unicode2bytes(str(k))
172            v = unicode2bytes(str(v))
173            _hash.update(k)
174            _hash.update(v)
175        return cls.__name__ + "_" + _hash.hexdigest()
176
177
178class BuildbotService(AsyncMultiService, config.ConfiguredMixin, util.ComparableMixin,
179                      ReconfigurableServiceMixin):
180    compare_attrs = ('name', '_config_args', '_config_kwargs')
181    name = None
182    configured = False
183    objectid = None
184
185    def __init__(self, *args, **kwargs):
186        name = kwargs.pop("name", None)
187        if name is not None:
188            self.name = bytes2unicode(name)
189        self.checkConfig(*args, **kwargs)
190        if self.name is None:
191            raise ValueError("{}: must pass a name to constructor".format(type(self)))
192        self._config_args = args
193        self._config_kwargs = kwargs
194        self.rendered = False
195        super().__init__()
196
197    def getConfigDict(self):
198        _type = type(self)
199        return {'name': self.name,
200                'class': _type.__module__ + "." + _type.__name__,
201                'args': self._config_args,
202                'kwargs': self._config_kwargs}
203
204    @defer.inlineCallbacks
205    def reconfigServiceWithSibling(self, sibling):
206        # only reconfigure if sibling is configured differently.
207        # sibling == self is using ComparableMixin's implementation
208        # only compare compare_attrs
209        if self.configured and util.ComparableMixin.isEquivalent(sibling, self):
210            return None
211        self.configured = True
212        # render renderables in parallel
213        # Properties import to resolve cyclic import issue
214        from buildbot.process.properties import Properties
215        p = Properties()
216        p.master = self.master
217        # render renderables in parallel
218        secrets = []
219        kwargs = {}
220        accumulateClassList(self.__class__, 'secrets', secrets)
221        for k, v in sibling._config_kwargs.items():
222            if k in secrets:
223                # for non reconfigurable services, we force the attribute
224                v = yield p.render(v)
225                setattr(sibling, k, v)
226                setattr(self, k, v)
227            kwargs[k] = v
228
229        d = yield self.reconfigService(*sibling._config_args,
230                                       **kwargs)
231        return d
232
233    def canReconfigWithSibling(self, sibling):
234        return reflect.qual(self.__class__) == reflect.qual(sibling.__class__)
235
236    def configureService(self):
237        # reconfigServiceWithSibling with self, means first configuration
238        return self.reconfigServiceWithSibling(self)
239
240    @defer.inlineCallbacks
241    def startService(self):
242        if not self.configured:
243            try:
244                yield self.configureService()
245            except NotImplementedError:
246                pass
247        yield super().startService()
248
249    def checkConfig(self, *args, **kwargs):
250        return defer.succeed(True)
251
252    def reconfigService(self, name=None, *args, **kwargs):
253        return defer.succeed(None)
254
255    def renderSecrets(self, *args):
256        # Properties import to resolve cyclic import issue
257        from buildbot.process.properties import Properties
258        p = Properties()
259        p.master = self.master
260
261        if len(args) == 1:
262            return p.render(args[0])
263
264        return defer.gatherResults([p.render(s) for s in args], consumeErrors=True)
265
266
267class ClusteredBuildbotService(BuildbotService):
268
269    """
270    ClusteredBuildbotService-es are meant to be executed on a single
271    master only. When starting such a service, by means of "yield startService",
272    it will first try to claim it on the current master and:
273    - return without actually starting it
274      if it was already claimed by another master (self.active == False).
275      It will however keep trying to claim it, in case another master
276      stops, and takes the job back.
277    - return after it starts else.
278    """
279    compare_attrs = ('name',)
280
281    POLL_INTERVAL_SEC = 5 * 60  # 5 minutes
282
283    serviceid = None
284    active = False
285
286    def __init__(self, *args, **kwargs):
287
288        self.serviceid = None
289        self.active = False
290        self._activityPollCall = None
291        self._activityPollDeferred = None
292        super().__init__(*args, **kwargs)
293
294    # activity handling
295
296    def isActive(self):
297        return self.active
298
299    def activate(self):
300        # will run when this instance becomes THE CHOSEN ONE for the cluster
301        return defer.succeed(None)
302
303    def deactivate(self):
304        # to be overridden by subclasses
305        # will run when this instance loses its chosen status
306        return defer.succeed(None)
307
308    # service arbitration hooks
309
310    def _getServiceId(self):
311        # retrieve the id for this service; we assume that, once we have a valid id,
312        # the id doesn't change. This may return a Deferred.
313        raise NotImplementedError
314
315    def _claimService(self):
316        # Attempt to claim the service for this master. Should return True or False
317        # (optionally via a Deferred) to indicate whether this master now owns the
318        # service.
319        raise NotImplementedError
320
321    def _unclaimService(self):
322        # Release the service from this master. This will only be called by a claimed
323        # service, and this really should be robust and release the claim. May return
324        # a Deferred.
325        raise NotImplementedError
326
327    # default implementation to delegate to the above methods
328
329    @defer.inlineCallbacks
330    def startService(self):
331        # subclasses should override startService only to perform actions that should
332        # run on all instances, even if they never get activated on this
333        # master.
334        yield super().startService()
335        self._startServiceDeferred = defer.Deferred()
336        self._startActivityPolling()
337        yield self._startServiceDeferred
338
339    @defer.inlineCallbacks
340    def stopService(self):
341        # subclasses should override stopService only to perform actions that should
342        # run on all instances, even if they never get activated on this
343        # master.
344
345        self._stopActivityPolling()
346
347        # need to wait for prior activations to finish
348        if self._activityPollDeferred:
349            yield self._activityPollDeferred
350
351        if self.active:
352            self.active = False
353
354            try:
355                yield self.deactivate()
356                yield self._unclaimService()
357            except Exception as e:
358                msg = "Caught exception while deactivating ClusteredService({})".format(self.name)
359                log.err(e, _why=msg)
360
361        yield super().stopService()
362
363    def _startActivityPolling(self):
364        self._activityPollCall = task.LoopingCall(self._activityPoll)
365        # plug in a clock if we have one, for tests
366        if hasattr(self, 'clock'):
367            self._activityPollCall.clock = self.clock
368
369        d = self._activityPollCall.start(self.POLL_INTERVAL_SEC, now=True)
370        self._activityPollDeferred = d
371
372        # this should never happen, but just in case:
373        d.addErrback(log.err, 'while polling for service activity:')
374
375    def _stopActivityPolling(self):
376        if self._activityPollCall:
377            self._activityPollCall.stop()
378            self._activityPollCall = None
379            return self._activityPollDeferred
380        return None
381
382    def _callbackStartServiceDeferred(self):
383        if self._startServiceDeferred is not None:
384            self._startServiceDeferred.callback(None)
385            self._startServiceDeferred = None
386
387    @defer.inlineCallbacks
388    def _activityPoll(self):
389        try:
390            # just in case..
391            if self.active:
392                return
393
394            if self.serviceid is None:
395                self.serviceid = yield self._getServiceId()
396
397            try:
398                claimed = yield self._claimService()
399            except Exception:
400                msg = ('WARNING: ClusteredService({}) got exception while trying to claim'
401                       ).format(self.name)
402                log.err(_why=msg)
403                return
404
405            if not claimed:
406                # this master is not responsible
407                # for this service, we callback for StartService
408                # if it was not callback-ed already,
409                # and keep polling to take back the service
410                # if another one lost it
411                self._callbackStartServiceDeferred()
412                return
413
414            try:
415                # this master is responsible for this service
416                # we activate it
417                self.active = True
418                yield self.activate()
419            except Exception:
420                # this service is half-active, and noted as such in the db..
421                msg = 'WARNING: ClusteredService({}) is only partially active'.format(self.name)
422                log.err(_why=msg)
423            finally:
424                # cannot wait for its deactivation
425                # with yield self._stopActivityPolling
426                # as we're currently executing the
427                # _activityPollCall callback
428                # we just call it without waiting its stop
429                # (that may open race conditions)
430                self._stopActivityPolling()
431                self._callbackStartServiceDeferred()
432        except Exception:
433            # don't pass exceptions into LoopingCall, which can cause it to
434            # fail
435            msg = 'WARNING: ClusteredService({}) failed during activity poll'.format(self.name)
436            log.err(_why=msg)
437
438
439class BuildbotServiceManager(AsyncMultiService, config.ConfiguredMixin,
440                             ReconfigurableServiceMixin):
441    config_attr = "services"
442    name = "services"
443
444    def getConfigDict(self):
445        return {'name': self.name,
446                'childs': [v.getConfigDict()
447                           for v in self.namedServices.values()]}
448
449    @defer.inlineCallbacks
450    def reconfigServiceWithBuildbotConfig(self, new_config):
451
452        # arrange childs by name
453        old_by_name = self.namedServices
454        old_set = set(old_by_name)
455        new_config_attr = getattr(new_config, self.config_attr)
456        if isinstance(new_config_attr, list):
457            new_by_name = {s.name: s
458                           for s in new_config_attr}
459        elif isinstance(new_config_attr, dict):
460            new_by_name = new_config_attr
461        else:
462            raise TypeError("config.{} should be a list or dictionary".format(self.config_attr))
463        new_set = set(new_by_name)
464
465        # calculate new childs, by name, and removed childs
466        removed_names, added_names = util.diffSets(old_set, new_set)
467
468        # find any children for which the old instance is not
469        # able to do a reconfig with the new sibling
470        # and add them to both removed and added, so that we
471        # run the new version
472        for n in old_set & new_set:
473            old = old_by_name[n]
474            new = new_by_name[n]
475            # check if we are able to reconfig service
476            if not old.canReconfigWithSibling(new):
477                removed_names.add(n)
478                added_names.add(n)
479
480        if removed_names or added_names:
481            log.msg("adding {} new {}, removing {}".format(len(added_names), self.config_attr,
482                                                           len(removed_names)))
483
484            for n in removed_names:
485                child = old_by_name[n]
486                # disownServiceParent calls stopService after removing the relationship
487                # as child might use self.master.data to stop itself, its better to stop it first
488                # (this is related to the fact that self.master is found by recursively looking at
489                # self.parent for a master)
490                yield child.stopService()
491                # it has already called, so do not call it again
492                child.stopService = lambda: None
493                yield child.disownServiceParent()
494
495            for n in added_names:
496                child = new_by_name[n]
497                # setup service's objectid
498                if hasattr(child, 'objectid'):
499                    class_name = '{}.{}'.format(child.__class__.__module__,
500                                                child.__class__.__name__)
501                    objectid = yield self.master.db.state.getObjectId(
502                        child.name, class_name)
503                    child.objectid = objectid
504                yield child.setServiceParent(self)
505
506        # As the services that were just added got
507        # reconfigServiceWithSibling called by
508        # setServiceParent->startService,
509        # we avoid calling it again by selecting
510        # in reconfigurable_services, services
511        # that were not added just now
512        reconfigurable_services = [svc for svc in self
513                                   if svc.name not in added_names]
514        # sort by priority
515        reconfigurable_services.sort(key=lambda svc: -svc.reconfig_priority)
516
517        for svc in reconfigurable_services:
518            if not svc.name:
519                raise ValueError(
520                    "{}: child {} should have a defined name attribute".format(self, svc))
521            config_sibling = new_by_name.get(svc.name)
522            try:
523                yield svc.reconfigServiceWithSibling(config_sibling)
524            except NotImplementedError:
525                # legacy support. Its too painful to transition old code to new Service life cycle
526                # so we implement switch of child when the service raises NotImplementedError
527                # Note this means that self will stop, and sibling will take ownership
528                # means that we have a small time where the service is unavailable.
529                yield svc.disownServiceParent()
530                config_sibling.objectid = svc.objectid
531                yield config_sibling.setServiceParent(self)
532