1# This file is part of Buildbot.  Buildbot is free software: you can
2# redistribute it and/or modify it under the terms of the GNU General Public
3# License as published by the Free Software Foundation, version 2.
4#
5# This program is distributed in the hope that it will be useful, but WITHOUT
6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
8# details.
9#
10# You should have received a copy of the GNU General Public License along with
11# this program; if not, write to the Free Software Foundation, Inc., 51
12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13#
14# Copyright Buildbot Team Members
15
16
17import os
18
19from twisted.cred import credentials
20from twisted.internet import defer
21from twisted.internet import reactor
22from twisted.internet.endpoints import clientFromString
23from twisted.python import log
24from twisted.python import util
25from twisted.spread import pb
26from twisted.trial import unittest
27
28import buildbot
29from buildbot import config
30from buildbot import pbmanager
31from buildbot import worker
32from buildbot.process import botmaster
33from buildbot.process import builder
34from buildbot.process import factory
35from buildbot.test.fake import fakemaster
36from buildbot.test.util.misc import TestReactorMixin
37from buildbot.util.eventual import eventually
38from buildbot.worker import manager as workermanager
39
40PKI_DIR = util.sibpath(__file__, 'pki')
41
42
43class FakeWorkerForBuilder(pb.Referenceable):
44
45    """
46    Fake worker-side WorkerForBuilder object
47    """
48
49
50class FakeWorkerWorker(pb.Referenceable):
51
52    """
53    Fake worker-side Worker object
54
55    @ivar master_persp: remote perspective on the master
56    """
57
58    def __init__(self, callWhenBuilderListSet):
59        self.callWhenBuilderListSet = callWhenBuilderListSet
60        self.master_persp = None
61        self._detach_deferreds = []
62        self._detached = False
63
64    def waitForDetach(self):
65        if self._detached:
66            return defer.succeed(None)
67        d = defer.Deferred()
68        self._detach_deferreds.append(d)
69        return d
70
71    def setMasterPerspective(self, persp):
72        self.master_persp = persp
73        # clear out master_persp on disconnect
74
75        def clear_persp():
76            self.master_persp = None
77        persp.broker.notifyOnDisconnect(clear_persp)
78
79        def fire_deferreds():
80            self._detached = True
81            self._detach_deferreds, deferreds = None, self._detach_deferreds
82            for d in deferreds:
83                d.callback(None)
84        persp.broker.notifyOnDisconnect(fire_deferreds)
85
86    def remote_print(self, message):
87        log.msg("WORKER-SIDE: remote_print(%r)" % (message,))
88
89    def remote_getWorkerInfo(self):
90        return {
91            'info': 'here',
92            'worker_commands': {
93                'x': 1,
94            },
95            'numcpus': 1,
96            'none': None,
97            'os_release': b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode(),
98            b'\xe3\x83\xaa\xe3\x83\xaa\xe3\x83\xbc\xe3\x82\xb9\xe3'
99            b'\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode():
100                b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode(),
101        }
102
103    def remote_getVersion(self):
104        return buildbot.version
105
106    def remote_getCommands(self):
107        return {'x': 1}
108
109    def remote_setBuilderList(self, builder_info):
110        builder_names = [n for n, dir in builder_info]
111        slbuilders = [FakeWorkerForBuilder() for n in builder_names]
112        eventually(self.callWhenBuilderListSet)
113        return dict(zip(builder_names, slbuilders))
114
115
116class FakeBuilder(builder.Builder):
117
118    def attached(self, worker, commands):
119        return defer.succeed(None)
120
121    def detached(self, worker):
122        pass
123
124    def getOldestRequestTime(self):
125        return 0
126
127    def maybeStartBuild(self):
128        return defer.succeed(None)
129
130
131class MyWorker(worker.Worker):
132
133    def attached(self, conn):
134        self.detach_d = defer.Deferred()
135        return super().attached(conn)
136
137    def detached(self):
138        super().detached()
139        self.detach_d, d = None, self.detach_d
140        d.callback(None)
141
142
143class TestWorkerComm(unittest.TestCase, TestReactorMixin):
144
145    """
146    Test handling of connections from workers as integrated with
147     - Twisted Spread
148     - real TCP connections.
149     - PBManager
150
151    @ivar master: fake build master
152    @ivar pbamanger: L{PBManager} instance
153    @ivar botmaster: L{BotMaster} instance
154    @ivar worker: master-side L{Worker} instance
155    @ivar workerworker: worker-side L{FakeWorkerWorker} instance
156    @ivar port: TCP port to connect to
157    @ivar server_connection_string: description string for the server endpoint
158    @ivar client_connection_string_tpl: description string template for the client
159                                endpoint (expects to passed 'port')
160    @ivar endpoint: endpoint controlling the outbound connection
161                    from worker to master
162    """
163
164    @defer.inlineCallbacks
165    def setUp(self):
166        self.setUpTestReactor()
167        self.master = fakemaster.make_master(self, wantMq=True, wantData=True,
168                                             wantDb=True)
169
170        # set the worker port to a loopback address with unspecified
171        # port
172        self.pbmanager = self.master.pbmanager = pbmanager.PBManager()
173        yield self.pbmanager.setServiceParent(self.master)
174
175        # remove the fakeServiceParent from fake service hierarchy, and replace
176        # by a real one
177        yield self.master.workers.disownServiceParent()
178        self.workers = self.master.workers = workermanager.WorkerManager(
179            self.master)
180        yield self.workers.setServiceParent(self.master)
181
182        self.botmaster = botmaster.BotMaster()
183        yield self.botmaster.setServiceParent(self.master)
184
185        self.master.botmaster = self.botmaster
186        self.master.data.updates.workerConfigured = lambda *a, **k: None
187        yield self.master.startService()
188
189        self.buildworker = None
190        self.port = None
191        self.workerworker = None
192        self.endpoint = None
193        self.broker = None
194        self._detach_deferreds = []
195
196        # patch in our FakeBuilder for the regular Builder class
197        self.patch(botmaster, 'Builder', FakeBuilder)
198
199        self.server_connection_string = "tcp:0:interface=127.0.0.1"
200        self.client_connection_string_tpl = "tcp:host=127.0.0.1:port={port}"
201
202    def tearDown(self):
203        if self.broker:
204            del self.broker
205        if self.endpoint:
206            del self.endpoint
207        deferreds = self._detach_deferreds + [
208            self.pbmanager.stopService(),
209            self.botmaster.stopService(),
210            self.workers.stopService(),
211        ]
212
213        # if the worker is still attached, wait for it to detach, too
214        if self.buildworker and self.buildworker.detach_d:
215            deferreds.append(self.buildworker.detach_d)
216
217        return defer.gatherResults(deferreds)
218
219    @defer.inlineCallbacks
220    def addWorker(self, **kwargs):
221        """
222        Create a master-side worker instance and add it to the BotMaster
223
224        @param **kwargs: arguments to pass to the L{Worker} constructor.
225        """
226        self.buildworker = MyWorker("testworker", "pw", **kwargs)
227
228        # reconfig the master to get it set up
229        new_config = self.master.config
230        new_config.protocols = {"pb": {"port": self.server_connection_string}}
231        new_config.workers = [self.buildworker]
232        new_config.builders = [config.BuilderConfig(
233            name='bldr',
234            workername='testworker', factory=factory.BuildFactory())]
235
236        yield self.botmaster.reconfigServiceWithBuildbotConfig(new_config)
237        yield self.workers.reconfigServiceWithBuildbotConfig(new_config)
238
239        # as part of the reconfig, the worker registered with the pbmanager, so
240        # get the port it was assigned
241        self.port = self.buildworker.registration.getPBPort()
242
243    def connectWorker(self, waitForBuilderList=True):
244        """
245        Connect a worker the master via PB
246
247        @param waitForBuilderList: don't return until the setBuilderList has
248        been called
249        @returns: L{FakeWorkerWorker} and a Deferred that will fire when it
250        is detached; via deferred
251        """
252        factory = pb.PBClientFactory()
253        creds = credentials.UsernamePassword(b"testworker", b"pw")
254        setBuilderList_d = defer.Deferred()
255        workerworker = FakeWorkerWorker(
256            lambda: setBuilderList_d.callback(None))
257
258        login_d = factory.login(creds, workerworker)
259
260        @login_d.addCallback
261        def logged_in(persp):
262            workerworker.setMasterPerspective(persp)
263
264            # set up to hear when the worker side disconnects
265            workerworker.detach_d = defer.Deferred()
266            persp.broker.notifyOnDisconnect(
267                lambda: workerworker.detach_d.callback(None))
268            self._detach_deferreds.append(workerworker.detach_d)
269
270            return workerworker
271
272        self.endpoint = clientFromString(
273                reactor, self.client_connection_string_tpl.format(port=self.port))
274        connected_d = self.endpoint.connect(factory)
275
276        dlist = [connected_d, login_d]
277        if waitForBuilderList:
278            dlist.append(setBuilderList_d)
279
280        d = defer.DeferredList(dlist,
281                               consumeErrors=True, fireOnOneErrback=True)
282        d.addCallback(lambda _: workerworker)
283        return d
284
285    def workerSideDisconnect(self, worker):
286        """Disconnect from the worker side"""
287        worker.master_persp.broker.transport.loseConnection()
288
289    @defer.inlineCallbacks
290    def test_connect_disconnect(self):
291        """Test a single worker connecting and disconnecting."""
292        yield self.addWorker()
293
294        # connect
295        worker = yield self.connectWorker()
296
297        # disconnect
298        self.workerSideDisconnect(worker)
299
300        # wait for the resulting detach
301        yield worker.waitForDetach()
302
303    @defer.inlineCallbacks
304    def test_tls_connect_disconnect(self):
305        """Test with TLS or SSL endpoint.
306
307        According to the deprecation note for the SSL client endpoint,
308        the TLS endpoint is supported from Twistd 16.0.
309
310        TODO add certificate verification (also will require some conditionals
311        on various versions, including PyOpenSSL, service_identity. The CA used
312        to generate the testing cert is in ``PKI_DIR/ca``
313        """
314        def escape_colon(path):
315            # on windows we can't have \ as it serves as the escape character for :
316            return path.replace('\\', '/').replace(':', '\\:')
317        self.server_connection_string = (
318            "ssl:port=0:certKey={pub}:privateKey={priv}:" +
319            "interface=127.0.0.1").format(
320                pub=escape_colon(os.path.join(PKI_DIR, '127.0.0.1.crt')),
321                priv=escape_colon(os.path.join(PKI_DIR, '127.0.0.1.key')))
322        self.client_connection_string_tpl = "ssl:host=127.0.0.1:port={port}"
323
324        yield self.addWorker()
325
326        # connect
327        worker = yield self.connectWorker()
328
329        # disconnect
330        self.workerSideDisconnect(worker)
331
332        # wait for the resulting detach
333        yield worker.waitForDetach()
334
335    @defer.inlineCallbacks
336    def test_worker_info(self):
337        yield self.addWorker()
338        worker = yield self.connectWorker()
339        props = self.buildworker.info
340        # check worker info passing
341        self.assertEqual(props.getProperty("info"),
342                         "here")
343        # check worker info passing with UTF-8
344        self.assertEqual(props.getProperty("os_release"),
345                         b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode())
346        self.assertEqual(props.getProperty(b'\xe3\x83\xaa\xe3\x83\xaa\xe3\x83\xbc\xe3\x82'
347                                           b'\xb9\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode()),
348                         b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode())
349        self.assertEqual(props.getProperty("none"), None)
350        self.assertEqual(props.getProperty("numcpus"), 1)
351
352        self.workerSideDisconnect(worker)
353        yield worker.waitForDetach()
354
355    @defer.inlineCallbacks
356    def _test_duplicate_worker(self):
357        yield self.addWorker()
358
359        # connect first worker
360        worker1 = yield self.connectWorker()
361
362        # connect second worker; this should fail
363        try:
364            yield self.connectWorker(waitForBuilderList=False)
365            connect_failed = False
366        except Exception:
367            connect_failed = True
368        self.assertTrue(connect_failed)
369
370        # disconnect both and wait for that to percolate
371        self.workerSideDisconnect(worker1)
372
373        yield worker1.waitForDetach()
374
375        # flush the exception logged for this on the master
376        self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1)
377
378    @defer.inlineCallbacks
379    def _test_duplicate_worker_old_dead(self):
380        yield self.addWorker()
381
382        # connect first worker
383        worker1 = yield self.connectWorker()
384
385        # monkeypatch that worker to fail with PBConnectionLost when its
386        # remote_print method is called
387        def remote_print(message):
388            worker1.master_persp.broker.transport.loseConnection()
389            raise pb.PBConnectionLost("fake!")
390        worker1.remote_print = remote_print
391
392        # connect second worker; this should succeed, and the old worker
393        # should be disconnected.
394        worker2 = yield self.connectWorker()
395
396        # disconnect both and wait for that to percolate
397        self.workerSideDisconnect(worker2)
398
399        yield worker1.waitForDetach()
400
401        # flush the exception logged for this on the worker
402        self.assertEqual(len(self.flushLoggedErrors(pb.PBConnectionLost)), 1)
403