1# This file is part of Buildbot. Buildbot is free software: you can 2# redistribute it and/or modify it under the terms of the GNU General Public 3# License as published by the Free Software Foundation, version 2. 4# 5# This program is distributed in the hope that it will be useful, but WITHOUT 6# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 7# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 8# details. 9# 10# You should have received a copy of the GNU General Public License along with 11# this program; if not, write to the Free Software Foundation, Inc., 51 12# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 13# 14# Copyright Buildbot Team Members 15 16 17import os 18 19from twisted.cred import credentials 20from twisted.internet import defer 21from twisted.internet import reactor 22from twisted.internet.endpoints import clientFromString 23from twisted.python import log 24from twisted.python import util 25from twisted.spread import pb 26from twisted.trial import unittest 27 28import buildbot 29from buildbot import config 30from buildbot import pbmanager 31from buildbot import worker 32from buildbot.process import botmaster 33from buildbot.process import builder 34from buildbot.process import factory 35from buildbot.test.fake import fakemaster 36from buildbot.test.util.misc import TestReactorMixin 37from buildbot.util.eventual import eventually 38from buildbot.worker import manager as workermanager 39 40PKI_DIR = util.sibpath(__file__, 'pki') 41 42 43class FakeWorkerForBuilder(pb.Referenceable): 44 45 """ 46 Fake worker-side WorkerForBuilder object 47 """ 48 49 50class FakeWorkerWorker(pb.Referenceable): 51 52 """ 53 Fake worker-side Worker object 54 55 @ivar master_persp: remote perspective on the master 56 """ 57 58 def __init__(self, callWhenBuilderListSet): 59 self.callWhenBuilderListSet = callWhenBuilderListSet 60 self.master_persp = None 61 self._detach_deferreds = [] 62 self._detached = False 63 64 def waitForDetach(self): 65 if self._detached: 66 return defer.succeed(None) 67 d = defer.Deferred() 68 self._detach_deferreds.append(d) 69 return d 70 71 def setMasterPerspective(self, persp): 72 self.master_persp = persp 73 # clear out master_persp on disconnect 74 75 def clear_persp(): 76 self.master_persp = None 77 persp.broker.notifyOnDisconnect(clear_persp) 78 79 def fire_deferreds(): 80 self._detached = True 81 self._detach_deferreds, deferreds = None, self._detach_deferreds 82 for d in deferreds: 83 d.callback(None) 84 persp.broker.notifyOnDisconnect(fire_deferreds) 85 86 def remote_print(self, message): 87 log.msg("WORKER-SIDE: remote_print(%r)" % (message,)) 88 89 def remote_getWorkerInfo(self): 90 return { 91 'info': 'here', 92 'worker_commands': { 93 'x': 1, 94 }, 95 'numcpus': 1, 96 'none': None, 97 'os_release': b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode(), 98 b'\xe3\x83\xaa\xe3\x83\xaa\xe3\x83\xbc\xe3\x82\xb9\xe3' 99 b'\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode(): 100 b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode(), 101 } 102 103 def remote_getVersion(self): 104 return buildbot.version 105 106 def remote_getCommands(self): 107 return {'x': 1} 108 109 def remote_setBuilderList(self, builder_info): 110 builder_names = [n for n, dir in builder_info] 111 slbuilders = [FakeWorkerForBuilder() for n in builder_names] 112 eventually(self.callWhenBuilderListSet) 113 return dict(zip(builder_names, slbuilders)) 114 115 116class FakeBuilder(builder.Builder): 117 118 def attached(self, worker, commands): 119 return defer.succeed(None) 120 121 def detached(self, worker): 122 pass 123 124 def getOldestRequestTime(self): 125 return 0 126 127 def maybeStartBuild(self): 128 return defer.succeed(None) 129 130 131class MyWorker(worker.Worker): 132 133 def attached(self, conn): 134 self.detach_d = defer.Deferred() 135 return super().attached(conn) 136 137 def detached(self): 138 super().detached() 139 self.detach_d, d = None, self.detach_d 140 d.callback(None) 141 142 143class TestWorkerComm(unittest.TestCase, TestReactorMixin): 144 145 """ 146 Test handling of connections from workers as integrated with 147 - Twisted Spread 148 - real TCP connections. 149 - PBManager 150 151 @ivar master: fake build master 152 @ivar pbamanger: L{PBManager} instance 153 @ivar botmaster: L{BotMaster} instance 154 @ivar worker: master-side L{Worker} instance 155 @ivar workerworker: worker-side L{FakeWorkerWorker} instance 156 @ivar port: TCP port to connect to 157 @ivar server_connection_string: description string for the server endpoint 158 @ivar client_connection_string_tpl: description string template for the client 159 endpoint (expects to passed 'port') 160 @ivar endpoint: endpoint controlling the outbound connection 161 from worker to master 162 """ 163 164 @defer.inlineCallbacks 165 def setUp(self): 166 self.setUpTestReactor() 167 self.master = fakemaster.make_master(self, wantMq=True, wantData=True, 168 wantDb=True) 169 170 # set the worker port to a loopback address with unspecified 171 # port 172 self.pbmanager = self.master.pbmanager = pbmanager.PBManager() 173 yield self.pbmanager.setServiceParent(self.master) 174 175 # remove the fakeServiceParent from fake service hierarchy, and replace 176 # by a real one 177 yield self.master.workers.disownServiceParent() 178 self.workers = self.master.workers = workermanager.WorkerManager( 179 self.master) 180 yield self.workers.setServiceParent(self.master) 181 182 self.botmaster = botmaster.BotMaster() 183 yield self.botmaster.setServiceParent(self.master) 184 185 self.master.botmaster = self.botmaster 186 self.master.data.updates.workerConfigured = lambda *a, **k: None 187 yield self.master.startService() 188 189 self.buildworker = None 190 self.port = None 191 self.workerworker = None 192 self.endpoint = None 193 self.broker = None 194 self._detach_deferreds = [] 195 196 # patch in our FakeBuilder for the regular Builder class 197 self.patch(botmaster, 'Builder', FakeBuilder) 198 199 self.server_connection_string = "tcp:0:interface=127.0.0.1" 200 self.client_connection_string_tpl = "tcp:host=127.0.0.1:port={port}" 201 202 def tearDown(self): 203 if self.broker: 204 del self.broker 205 if self.endpoint: 206 del self.endpoint 207 deferreds = self._detach_deferreds + [ 208 self.pbmanager.stopService(), 209 self.botmaster.stopService(), 210 self.workers.stopService(), 211 ] 212 213 # if the worker is still attached, wait for it to detach, too 214 if self.buildworker and self.buildworker.detach_d: 215 deferreds.append(self.buildworker.detach_d) 216 217 return defer.gatherResults(deferreds) 218 219 @defer.inlineCallbacks 220 def addWorker(self, **kwargs): 221 """ 222 Create a master-side worker instance and add it to the BotMaster 223 224 @param **kwargs: arguments to pass to the L{Worker} constructor. 225 """ 226 self.buildworker = MyWorker("testworker", "pw", **kwargs) 227 228 # reconfig the master to get it set up 229 new_config = self.master.config 230 new_config.protocols = {"pb": {"port": self.server_connection_string}} 231 new_config.workers = [self.buildworker] 232 new_config.builders = [config.BuilderConfig( 233 name='bldr', 234 workername='testworker', factory=factory.BuildFactory())] 235 236 yield self.botmaster.reconfigServiceWithBuildbotConfig(new_config) 237 yield self.workers.reconfigServiceWithBuildbotConfig(new_config) 238 239 # as part of the reconfig, the worker registered with the pbmanager, so 240 # get the port it was assigned 241 self.port = self.buildworker.registration.getPBPort() 242 243 def connectWorker(self, waitForBuilderList=True): 244 """ 245 Connect a worker the master via PB 246 247 @param waitForBuilderList: don't return until the setBuilderList has 248 been called 249 @returns: L{FakeWorkerWorker} and a Deferred that will fire when it 250 is detached; via deferred 251 """ 252 factory = pb.PBClientFactory() 253 creds = credentials.UsernamePassword(b"testworker", b"pw") 254 setBuilderList_d = defer.Deferred() 255 workerworker = FakeWorkerWorker( 256 lambda: setBuilderList_d.callback(None)) 257 258 login_d = factory.login(creds, workerworker) 259 260 @login_d.addCallback 261 def logged_in(persp): 262 workerworker.setMasterPerspective(persp) 263 264 # set up to hear when the worker side disconnects 265 workerworker.detach_d = defer.Deferred() 266 persp.broker.notifyOnDisconnect( 267 lambda: workerworker.detach_d.callback(None)) 268 self._detach_deferreds.append(workerworker.detach_d) 269 270 return workerworker 271 272 self.endpoint = clientFromString( 273 reactor, self.client_connection_string_tpl.format(port=self.port)) 274 connected_d = self.endpoint.connect(factory) 275 276 dlist = [connected_d, login_d] 277 if waitForBuilderList: 278 dlist.append(setBuilderList_d) 279 280 d = defer.DeferredList(dlist, 281 consumeErrors=True, fireOnOneErrback=True) 282 d.addCallback(lambda _: workerworker) 283 return d 284 285 def workerSideDisconnect(self, worker): 286 """Disconnect from the worker side""" 287 worker.master_persp.broker.transport.loseConnection() 288 289 @defer.inlineCallbacks 290 def test_connect_disconnect(self): 291 """Test a single worker connecting and disconnecting.""" 292 yield self.addWorker() 293 294 # connect 295 worker = yield self.connectWorker() 296 297 # disconnect 298 self.workerSideDisconnect(worker) 299 300 # wait for the resulting detach 301 yield worker.waitForDetach() 302 303 @defer.inlineCallbacks 304 def test_tls_connect_disconnect(self): 305 """Test with TLS or SSL endpoint. 306 307 According to the deprecation note for the SSL client endpoint, 308 the TLS endpoint is supported from Twistd 16.0. 309 310 TODO add certificate verification (also will require some conditionals 311 on various versions, including PyOpenSSL, service_identity. The CA used 312 to generate the testing cert is in ``PKI_DIR/ca`` 313 """ 314 def escape_colon(path): 315 # on windows we can't have \ as it serves as the escape character for : 316 return path.replace('\\', '/').replace(':', '\\:') 317 self.server_connection_string = ( 318 "ssl:port=0:certKey={pub}:privateKey={priv}:" + 319 "interface=127.0.0.1").format( 320 pub=escape_colon(os.path.join(PKI_DIR, '127.0.0.1.crt')), 321 priv=escape_colon(os.path.join(PKI_DIR, '127.0.0.1.key'))) 322 self.client_connection_string_tpl = "ssl:host=127.0.0.1:port={port}" 323 324 yield self.addWorker() 325 326 # connect 327 worker = yield self.connectWorker() 328 329 # disconnect 330 self.workerSideDisconnect(worker) 331 332 # wait for the resulting detach 333 yield worker.waitForDetach() 334 335 @defer.inlineCallbacks 336 def test_worker_info(self): 337 yield self.addWorker() 338 worker = yield self.connectWorker() 339 props = self.buildworker.info 340 # check worker info passing 341 self.assertEqual(props.getProperty("info"), 342 "here") 343 # check worker info passing with UTF-8 344 self.assertEqual(props.getProperty("os_release"), 345 b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode()) 346 self.assertEqual(props.getProperty(b'\xe3\x83\xaa\xe3\x83\xaa\xe3\x83\xbc\xe3\x82' 347 b'\xb9\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode()), 348 b'\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88'.decode()) 349 self.assertEqual(props.getProperty("none"), None) 350 self.assertEqual(props.getProperty("numcpus"), 1) 351 352 self.workerSideDisconnect(worker) 353 yield worker.waitForDetach() 354 355 @defer.inlineCallbacks 356 def _test_duplicate_worker(self): 357 yield self.addWorker() 358 359 # connect first worker 360 worker1 = yield self.connectWorker() 361 362 # connect second worker; this should fail 363 try: 364 yield self.connectWorker(waitForBuilderList=False) 365 connect_failed = False 366 except Exception: 367 connect_failed = True 368 self.assertTrue(connect_failed) 369 370 # disconnect both and wait for that to percolate 371 self.workerSideDisconnect(worker1) 372 373 yield worker1.waitForDetach() 374 375 # flush the exception logged for this on the master 376 self.assertEqual(len(self.flushLoggedErrors(RuntimeError)), 1) 377 378 @defer.inlineCallbacks 379 def _test_duplicate_worker_old_dead(self): 380 yield self.addWorker() 381 382 # connect first worker 383 worker1 = yield self.connectWorker() 384 385 # monkeypatch that worker to fail with PBConnectionLost when its 386 # remote_print method is called 387 def remote_print(message): 388 worker1.master_persp.broker.transport.loseConnection() 389 raise pb.PBConnectionLost("fake!") 390 worker1.remote_print = remote_print 391 392 # connect second worker; this should succeed, and the old worker 393 # should be disconnected. 394 worker2 = yield self.connectWorker() 395 396 # disconnect both and wait for that to percolate 397 self.workerSideDisconnect(worker2) 398 399 yield worker1.waitForDetach() 400 401 # flush the exception logged for this on the worker 402 self.assertEqual(len(self.flushLoggedErrors(pb.PBConnectionLost)), 1) 403