1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16
17from twisted.internet import defer
18from twisted.python import log
19
20from buildbot.process.measured_service import MeasuredBuildbotServiceManager
21from buildbot.util import misc
22from buildbot.worker.protocols import pb as bbpb
23
24
25class WorkerRegistration:
26
27    __slots__ = ['master', 'worker', 'pbReg']
28
29    def __init__(self, master, worker):
30        self.master = master
31        self.worker = worker
32
33    def __repr__(self):
34        return "<{} for {}>".format(self.__class__.__name__, repr(self.worker.workername))
35
36    @defer.inlineCallbacks
37    def unregister(self):
38        bs = self.worker
39        # update with portStr=None to remove any registration in place
40        yield self.master.workers.pb.updateRegistration(
41            bs.workername, bs.password, None)
42        yield self.master.workers._unregister(self)
43
44    @defer.inlineCallbacks
45    def update(self, worker_config, global_config):
46        # For most protocols, there's nothing to do, but for PB we must
47        # update the registration in case the port or password has changed.
48        if 'pb' in global_config.protocols:
49            self.pbReg = yield self.master.workers.pb.updateRegistration(
50                worker_config.workername, worker_config.password,
51                global_config.protocols['pb']['port'])
52
53    def getPBPort(self):
54        return self.pbReg.getPort()
55
56
57class WorkerManager(MeasuredBuildbotServiceManager):
58
59    name = "WorkerManager"
60    managed_services_name = "workers"
61
62    config_attr = "workers"
63    PING_TIMEOUT = 10
64    reconfig_priority = 127
65
66    def __init__(self, master):
67        super().__init__()
68
69        self.pb = bbpb.Listener(master)
70
71        # WorkerRegistration instances keyed by worker name
72        self.registrations = {}
73
74        # connection objects keyed by worker name
75        self.connections = {}
76
77    @property
78    def workers(self):
79        # self.workers contains a ready Worker instance for each
80        # potential worker, i.e. all the ones listed in the config file.
81        # If the worker is connected, self.workers[workername].worker will
82        # contain a RemoteReference to their Bot instance. If it is not
83        # connected, that attribute will hold None.
84        # workers attribute is actually just an alias to multiService's
85        # namedService
86        return self.namedServices
87
88    def getWorkerByName(self, workerName):
89        return self.registrations[workerName].worker
90
91    def register(self, worker):
92        # TODO: doc that reg.update must be called, too
93        workerName = worker.workername
94        reg = WorkerRegistration(self.master, worker)
95        self.registrations[workerName] = reg
96        return defer.succeed(reg)
97
98    def _unregister(self, registration):
99        del self.registrations[registration.worker.workername]
100
101    @defer.inlineCallbacks
102    def newConnection(self, conn, workerName):
103        if workerName in self.connections:
104            log.msg(("Got duplication connection from '{}'"
105                     " starting arbitration procedure").format(workerName))
106            old_conn = self.connections[workerName]
107            try:
108                yield misc.cancelAfter(self.PING_TIMEOUT,
109                                       old_conn.remotePrint("master got a duplicate connection"),
110                                       self.master.reactor)
111                # if we get here then old connection is still alive, and new
112                # should be rejected
113                raise RuntimeError("rejecting duplicate worker")
114            except defer.CancelledError:
115                old_conn.loseConnection()
116                log.msg("Connected worker '{}' ping timed out after {} seconds".format(workerName,
117                        self.PING_TIMEOUT))
118            except RuntimeError:
119                raise
120            except Exception as e:
121                old_conn.loseConnection()
122                log.msg("Got error while trying to ping connected worker {}:{}".format(workerName,
123                                                                                       e))
124            log.msg("Old connection for '{}' was lost, accepting new".format(workerName))
125
126        try:
127            yield conn.remotePrint(message="attached")
128            info = yield conn.remoteGetWorkerInfo()
129            log.msg("Got workerinfo from '{}'".format(workerName))
130        except Exception as e:
131            log.msg("Failed to communicate with worker '{}'\n{}".format(workerName, e))
132            raise
133
134        conn.info = info
135        self.connections[workerName] = conn
136
137        def remove():
138            del self.connections[workerName]
139        conn.notifyOnDisconnect(remove)
140
141        # accept the connection
142        return True
143