1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16 17from twisted.internet import defer 18from twisted.python import log 19 20from buildbot.process.measured_service import MeasuredBuildbotServiceManager 21from buildbot.util import misc 22from buildbot.worker.protocols import pb as bbpb 23 24 25class WorkerRegistration: 26 27 __slots__ = ['master', 'worker', 'pbReg'] 28 29 def __init__(self, master, worker): 30 self.master = master 31 self.worker = worker 32 33 def __repr__(self): 34 return "<{} for {}>".format(self.__class__.__name__, repr(self.worker.workername)) 35 36 @defer.inlineCallbacks 37 def unregister(self): 38 bs = self.worker 39 # update with portStr=None to remove any registration in place 40 yield self.master.workers.pb.updateRegistration( 41 bs.workername, bs.password, None) 42 yield self.master.workers._unregister(self) 43 44 @defer.inlineCallbacks 45 def update(self, worker_config, global_config): 46 # For most protocols, there's nothing to do, but for PB we must 47 # update the registration in case the port or password has changed. 48 if 'pb' in global_config.protocols: 49 self.pbReg = yield self.master.workers.pb.updateRegistration( 50 worker_config.workername, worker_config.password, 51 global_config.protocols['pb']['port']) 52 53 def getPBPort(self): 54 return self.pbReg.getPort() 55 56 57class WorkerManager(MeasuredBuildbotServiceManager): 58 59 name = "WorkerManager" 60 managed_services_name = "workers" 61 62 config_attr = "workers" 63 PING_TIMEOUT = 10 64 reconfig_priority = 127 65 66 def __init__(self, master): 67 super().__init__() 68 69 self.pb = bbpb.Listener(master) 70 71 # WorkerRegistration instances keyed by worker name 72 self.registrations = {} 73 74 # connection objects keyed by worker name 75 self.connections = {} 76 77 @property 78 def workers(self): 79 # self.workers contains a ready Worker instance for each 80 # potential worker, i.e. all the ones listed in the config file. 81 # If the worker is connected, self.workers[workername].worker will 82 # contain a RemoteReference to their Bot instance. If it is not 83 # connected, that attribute will hold None. 84 # workers attribute is actually just an alias to multiService's 85 # namedService 86 return self.namedServices 87 88 def getWorkerByName(self, workerName): 89 return self.registrations[workerName].worker 90 91 def register(self, worker): 92 # TODO: doc that reg.update must be called, too 93 workerName = worker.workername 94 reg = WorkerRegistration(self.master, worker) 95 self.registrations[workerName] = reg 96 return defer.succeed(reg) 97 98 def _unregister(self, registration): 99 del self.registrations[registration.worker.workername] 100 101 @defer.inlineCallbacks 102 def newConnection(self, conn, workerName): 103 if workerName in self.connections: 104 log.msg(("Got duplication connection from '{}'" 105 " starting arbitration procedure").format(workerName)) 106 old_conn = self.connections[workerName] 107 try: 108 yield misc.cancelAfter(self.PING_TIMEOUT, 109 old_conn.remotePrint("master got a duplicate connection"), 110 self.master.reactor) 111 # if we get here then old connection is still alive, and new 112 # should be rejected 113 raise RuntimeError("rejecting duplicate worker") 114 except defer.CancelledError: 115 old_conn.loseConnection() 116 log.msg("Connected worker '{}' ping timed out after {} seconds".format(workerName, 117 self.PING_TIMEOUT)) 118 except RuntimeError: 119 raise 120 except Exception as e: 121 old_conn.loseConnection() 122 log.msg("Got error while trying to ping connected worker {}:{}".format(workerName, 123 e)) 124 log.msg("Old connection for '{}' was lost, accepting new".format(workerName)) 125 126 try: 127 yield conn.remotePrint(message="attached") 128 info = yield conn.remoteGetWorkerInfo() 129 log.msg("Got workerinfo from '{}'".format(workerName)) 130 except Exception as e: 131 log.msg("Failed to communicate with worker '{}'\n{}".format(workerName, e)) 132 raise 133 134 conn.info = info 135 self.connections[workerName] = conn 136 137 def remove(): 138 del self.connections[workerName] 139 conn.notifyOnDisconnect(remove) 140 141 # accept the connection 142 return True 143