1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16import hashlib 17 18from twisted.application import service 19from twisted.internet import defer 20from twisted.internet import task 21from twisted.python import log 22from twisted.python import reflect 23from twisted.python.reflect import accumulateClassList 24 25from buildbot import util 26from buildbot.util import bytes2unicode 27from buildbot.util import config 28from buildbot.util import unicode2bytes 29 30 31class ReconfigurableServiceMixin: 32 33 reconfig_priority = 128 34 35 @defer.inlineCallbacks 36 def reconfigServiceWithBuildbotConfig(self, new_config): 37 if not service.IServiceCollection.providedBy(self): 38 return 39 40 # get a list of child services to reconfigure 41 reconfigurable_services = [svc 42 for svc in self 43 if isinstance(svc, ReconfigurableServiceMixin)] 44 45 # sort by priority 46 reconfigurable_services.sort(key=lambda svc: -svc.reconfig_priority) 47 48 for svc in reconfigurable_services: 49 yield svc.reconfigServiceWithBuildbotConfig(new_config) 50 51 52# twisted 16's Service is now an new style class, better put everybody new style 53# to catch issues even on twisted < 16 54class AsyncService(service.Service): 55 56 # service.Service.setServiceParent does not wait for neither disownServiceParent nor addService 57 # to complete 58 @defer.inlineCallbacks 59 def setServiceParent(self, parent): 60 if self.parent is not None: 61 yield self.disownServiceParent() 62 parent = service.IServiceCollection(parent, parent) 63 self.parent = parent 64 yield self.parent.addService(self) 65 66 # service.Service.disownServiceParent does not wait for removeService to complete before 67 # setting parent to None 68 @defer.inlineCallbacks 69 def disownServiceParent(self): 70 yield self.parent.removeService(self) 71 self.parent = None 72 73 # We recurse over the parent services until we find a MasterService 74 @property 75 def master(self): 76 if self.parent is None: 77 return None 78 return self.parent.master 79 80 81class AsyncMultiService(AsyncService, service.MultiService): 82 83 def startService(self): 84 # Do NOT use super() here. 85 # The method resolution order would cause MultiService.startService() to 86 # be called which we explicitly want to override with this method. 87 service.Service.startService(self) 88 dl = [] 89 # if a service attaches another service during the reconfiguration 90 # then the service will be started twice, so we don't use iter, but rather 91 # copy in a list 92 for svc in list(self): 93 # handle any deferreds, passing up errors and success 94 dl.append(defer.maybeDeferred(svc.startService)) 95 return defer.gatherResults(dl, consumeErrors=True) 96 97 @defer.inlineCallbacks 98 def stopService(self): 99 # Do NOT use super() here. 100 # The method resolution order would cause MultiService.stopService() to 101 # be called which we explicitly want to override with this method. 102 service.Service.stopService(self) 103 services = list(self) 104 services.reverse() 105 dl = [] 106 for svc in services: 107 if not isinstance(svc, SharedService): 108 dl.append(defer.maybeDeferred(svc.stopService)) 109 # unlike MultiService, consume errors in each individual deferred, and 110 # pass the first error in a child service up to our caller 111 yield defer.gatherResults(dl, consumeErrors=True) 112 for svc in services: 113 if isinstance(svc, SharedService): 114 yield svc.stopService() 115 116 def addService(self, service): 117 if service.name is not None: 118 if service.name in self.namedServices: 119 raise RuntimeError(("cannot have two services with same name" 120 " '{}'").format(service.name)) 121 self.namedServices[service.name] = service 122 self.services.append(service) 123 if self.running: 124 # It may be too late for that, but we will do our best 125 service.privilegedStartService() 126 return service.startService() 127 return defer.succeed(None) 128 129 130class MasterService(AsyncMultiService): 131 # master service is the service that stops the master property recursion 132 133 @property 134 def master(self): 135 return self 136 137 138class SharedService(AsyncMultiService): 139 """a service that is created only once per parameter set in a parent service""" 140 141 @classmethod 142 @defer.inlineCallbacks 143 def getService(cls, parent, *args, **kwargs): 144 name = cls.getName(*args, **kwargs) 145 if name in parent.namedServices: 146 return parent.namedServices[name] 147 148 instance = cls(*args, **kwargs) 149 150 # The class is not required to initialized its name 151 # but we use the name to identify the instance in the parent service 152 # so we force it with the name we used 153 instance.name = name 154 yield instance.setServiceParent(parent) 155 156 # we put the service on top of the list, so that it is stopped the last 157 # This make sense as the shared service is used as a dependency 158 # for other service 159 parent.services.remove(instance) 160 parent.services.insert(0, instance) 161 # hook the return value to the instance object 162 return instance 163 164 @classmethod 165 def getName(cls, *args, **kwargs): 166 _hash = hashlib.sha1() 167 for arg in args: 168 arg = unicode2bytes(str(arg)) 169 _hash.update(arg) 170 for k, v in sorted(kwargs.items()): 171 k = unicode2bytes(str(k)) 172 v = unicode2bytes(str(v)) 173 _hash.update(k) 174 _hash.update(v) 175 return cls.__name__ + "_" + _hash.hexdigest() 176 177 178class BuildbotService(AsyncMultiService, config.ConfiguredMixin, util.ComparableMixin, 179 ReconfigurableServiceMixin): 180 compare_attrs = ('name', '_config_args', '_config_kwargs') 181 name = None 182 configured = False 183 objectid = None 184 185 def __init__(self, *args, **kwargs): 186 name = kwargs.pop("name", None) 187 if name is not None: 188 self.name = bytes2unicode(name) 189 self.checkConfig(*args, **kwargs) 190 if self.name is None: 191 raise ValueError("{}: must pass a name to constructor".format(type(self))) 192 self._config_args = args 193 self._config_kwargs = kwargs 194 self.rendered = False 195 super().__init__() 196 197 def getConfigDict(self): 198 _type = type(self) 199 return {'name': self.name, 200 'class': _type.__module__ + "." + _type.__name__, 201 'args': self._config_args, 202 'kwargs': self._config_kwargs} 203 204 @defer.inlineCallbacks 205 def reconfigServiceWithSibling(self, sibling): 206 # only reconfigure if sibling is configured differently. 207 # sibling == self is using ComparableMixin's implementation 208 # only compare compare_attrs 209 if self.configured and util.ComparableMixin.isEquivalent(sibling, self): 210 return None 211 self.configured = True 212 # render renderables in parallel 213 # Properties import to resolve cyclic import issue 214 from buildbot.process.properties import Properties 215 p = Properties() 216 p.master = self.master 217 # render renderables in parallel 218 secrets = [] 219 kwargs = {} 220 accumulateClassList(self.__class__, 'secrets', secrets) 221 for k, v in sibling._config_kwargs.items(): 222 if k in secrets: 223 # for non reconfigurable services, we force the attribute 224 v = yield p.render(v) 225 setattr(sibling, k, v) 226 setattr(self, k, v) 227 kwargs[k] = v 228 229 d = yield self.reconfigService(*sibling._config_args, 230 **kwargs) 231 return d 232 233 def canReconfigWithSibling(self, sibling): 234 return reflect.qual(self.__class__) == reflect.qual(sibling.__class__) 235 236 def configureService(self): 237 # reconfigServiceWithSibling with self, means first configuration 238 return self.reconfigServiceWithSibling(self) 239 240 @defer.inlineCallbacks 241 def startService(self): 242 if not self.configured: 243 try: 244 yield self.configureService() 245 except NotImplementedError: 246 pass 247 yield super().startService() 248 249 def checkConfig(self, *args, **kwargs): 250 return defer.succeed(True) 251 252 def reconfigService(self, name=None, *args, **kwargs): 253 return defer.succeed(None) 254 255 def renderSecrets(self, *args): 256 # Properties import to resolve cyclic import issue 257 from buildbot.process.properties import Properties 258 p = Properties() 259 p.master = self.master 260 261 if len(args) == 1: 262 return p.render(args[0]) 263 264 return defer.gatherResults([p.render(s) for s in args], consumeErrors=True) 265 266 267class ClusteredBuildbotService(BuildbotService): 268 269 """ 270 ClusteredBuildbotService-es are meant to be executed on a single 271 master only. When starting such a service, by means of "yield startService", 272 it will first try to claim it on the current master and: 273 - return without actually starting it 274 if it was already claimed by another master (self.active == False). 275 It will however keep trying to claim it, in case another master 276 stops, and takes the job back. 277 - return after it starts else. 278 """ 279 compare_attrs = ('name',) 280 281 POLL_INTERVAL_SEC = 5 * 60 # 5 minutes 282 283 serviceid = None 284 active = False 285 286 def __init__(self, *args, **kwargs): 287 288 self.serviceid = None 289 self.active = False 290 self._activityPollCall = None 291 self._activityPollDeferred = None 292 super().__init__(*args, **kwargs) 293 294 # activity handling 295 296 def isActive(self): 297 return self.active 298 299 def activate(self): 300 # will run when this instance becomes THE CHOSEN ONE for the cluster 301 return defer.succeed(None) 302 303 def deactivate(self): 304 # to be overridden by subclasses 305 # will run when this instance loses its chosen status 306 return defer.succeed(None) 307 308 # service arbitration hooks 309 310 def _getServiceId(self): 311 # retrieve the id for this service; we assume that, once we have a valid id, 312 # the id doesn't change. This may return a Deferred. 313 raise NotImplementedError 314 315 def _claimService(self): 316 # Attempt to claim the service for this master. Should return True or False 317 # (optionally via a Deferred) to indicate whether this master now owns the 318 # service. 319 raise NotImplementedError 320 321 def _unclaimService(self): 322 # Release the service from this master. This will only be called by a claimed 323 # service, and this really should be robust and release the claim. May return 324 # a Deferred. 325 raise NotImplementedError 326 327 # default implementation to delegate to the above methods 328 329 @defer.inlineCallbacks 330 def startService(self): 331 # subclasses should override startService only to perform actions that should 332 # run on all instances, even if they never get activated on this 333 # master. 334 yield super().startService() 335 self._startServiceDeferred = defer.Deferred() 336 self._startActivityPolling() 337 yield self._startServiceDeferred 338 339 @defer.inlineCallbacks 340 def stopService(self): 341 # subclasses should override stopService only to perform actions that should 342 # run on all instances, even if they never get activated on this 343 # master. 344 345 self._stopActivityPolling() 346 347 # need to wait for prior activations to finish 348 if self._activityPollDeferred: 349 yield self._activityPollDeferred 350 351 if self.active: 352 self.active = False 353 354 try: 355 yield self.deactivate() 356 yield self._unclaimService() 357 except Exception as e: 358 msg = "Caught exception while deactivating ClusteredService({})".format(self.name) 359 log.err(e, _why=msg) 360 361 yield super().stopService() 362 363 def _startActivityPolling(self): 364 self._activityPollCall = task.LoopingCall(self._activityPoll) 365 # plug in a clock if we have one, for tests 366 if hasattr(self, 'clock'): 367 self._activityPollCall.clock = self.clock 368 369 d = self._activityPollCall.start(self.POLL_INTERVAL_SEC, now=True) 370 self._activityPollDeferred = d 371 372 # this should never happen, but just in case: 373 d.addErrback(log.err, 'while polling for service activity:') 374 375 def _stopActivityPolling(self): 376 if self._activityPollCall: 377 self._activityPollCall.stop() 378 self._activityPollCall = None 379 return self._activityPollDeferred 380 return None 381 382 def _callbackStartServiceDeferred(self): 383 if self._startServiceDeferred is not None: 384 self._startServiceDeferred.callback(None) 385 self._startServiceDeferred = None 386 387 @defer.inlineCallbacks 388 def _activityPoll(self): 389 try: 390 # just in case.. 391 if self.active: 392 return 393 394 if self.serviceid is None: 395 self.serviceid = yield self._getServiceId() 396 397 try: 398 claimed = yield self._claimService() 399 except Exception: 400 msg = ('WARNING: ClusteredService({}) got exception while trying to claim' 401 ).format(self.name) 402 log.err(_why=msg) 403 return 404 405 if not claimed: 406 # this master is not responsible 407 # for this service, we callback for StartService 408 # if it was not callback-ed already, 409 # and keep polling to take back the service 410 # if another one lost it 411 self._callbackStartServiceDeferred() 412 return 413 414 try: 415 # this master is responsible for this service 416 # we activate it 417 self.active = True 418 yield self.activate() 419 except Exception: 420 # this service is half-active, and noted as such in the db.. 421 msg = 'WARNING: ClusteredService({}) is only partially active'.format(self.name) 422 log.err(_why=msg) 423 finally: 424 # cannot wait for its deactivation 425 # with yield self._stopActivityPolling 426 # as we're currently executing the 427 # _activityPollCall callback 428 # we just call it without waiting its stop 429 # (that may open race conditions) 430 self._stopActivityPolling() 431 self._callbackStartServiceDeferred() 432 except Exception: 433 # don't pass exceptions into LoopingCall, which can cause it to 434 # fail 435 msg = 'WARNING: ClusteredService({}) failed during activity poll'.format(self.name) 436 log.err(_why=msg) 437 438 439class BuildbotServiceManager(AsyncMultiService, config.ConfiguredMixin, 440 ReconfigurableServiceMixin): 441 config_attr = "services" 442 name = "services" 443 444 def getConfigDict(self): 445 return {'name': self.name, 446 'childs': [v.getConfigDict() 447 for v in self.namedServices.values()]} 448 449 @defer.inlineCallbacks 450 def reconfigServiceWithBuildbotConfig(self, new_config): 451 452 # arrange childs by name 453 old_by_name = self.namedServices 454 old_set = set(old_by_name) 455 new_config_attr = getattr(new_config, self.config_attr) 456 if isinstance(new_config_attr, list): 457 new_by_name = {s.name: s 458 for s in new_config_attr} 459 elif isinstance(new_config_attr, dict): 460 new_by_name = new_config_attr 461 else: 462 raise TypeError("config.{} should be a list or dictionary".format(self.config_attr)) 463 new_set = set(new_by_name) 464 465 # calculate new childs, by name, and removed childs 466 removed_names, added_names = util.diffSets(old_set, new_set) 467 468 # find any children for which the old instance is not 469 # able to do a reconfig with the new sibling 470 # and add them to both removed and added, so that we 471 # run the new version 472 for n in old_set & new_set: 473 old = old_by_name[n] 474 new = new_by_name[n] 475 # check if we are able to reconfig service 476 if not old.canReconfigWithSibling(new): 477 removed_names.add(n) 478 added_names.add(n) 479 480 if removed_names or added_names: 481 log.msg("adding {} new {}, removing {}".format(len(added_names), self.config_attr, 482 len(removed_names))) 483 484 for n in removed_names: 485 child = old_by_name[n] 486 # disownServiceParent calls stopService after removing the relationship 487 # as child might use self.master.data to stop itself, its better to stop it first 488 # (this is related to the fact that self.master is found by recursively looking at 489 # self.parent for a master) 490 yield child.stopService() 491 # it has already called, so do not call it again 492 child.stopService = lambda: None 493 yield child.disownServiceParent() 494 495 for n in added_names: 496 child = new_by_name[n] 497 # setup service's objectid 498 if hasattr(child, 'objectid'): 499 class_name = '{}.{}'.format(child.__class__.__module__, 500 child.__class__.__name__) 501 objectid = yield self.master.db.state.getObjectId( 502 child.name, class_name) 503 child.objectid = objectid 504 yield child.setServiceParent(self) 505 506 # As the services that were just added got 507 # reconfigServiceWithSibling called by 508 # setServiceParent->startService, 509 # we avoid calling it again by selecting 510 # in reconfigurable_services, services 511 # that were not added just now 512 reconfigurable_services = [svc for svc in self 513 if svc.name not in added_names] 514 # sort by priority 515 reconfigurable_services.sort(key=lambda svc: -svc.reconfig_priority) 516 517 for svc in reconfigurable_services: 518 if not svc.name: 519 raise ValueError( 520 "{}: child {} should have a defined name attribute".format(self, svc)) 521 config_sibling = new_by_name.get(svc.name) 522 try: 523 yield svc.reconfigServiceWithSibling(config_sibling) 524 except NotImplementedError: 525 # legacy support. Its too painful to transition old code to new Service life cycle 526 # so we implement switch of child when the service raises NotImplementedError 527 # Note this means that self will stop, and sibling will take ownership 528 # means that we have a small time where the service is unavailable. 529 yield svc.disownServiceParent() 530 config_sibling.objectid = svc.objectid 531 yield config_sibling.setServiceParent(self) 532