1""" 2mostly functional tests of gateways. 3""" 4import os 5import py 6import pytest 7import socket 8import subprocess 9import sys 10 11import execnet 12from execnet import gateway_base, gateway_io 13from test_serializer import _find_version 14TESTTIMEOUT = 10.0 # seconds 15needs_osdup = py.test.mark.skipif("not hasattr(os, 'dup')") 16 17 18skip_win_pypy = pytest.mark.xfail(condition=hasattr(sys, 'pypy_version_info') and sys.platform.startswith('win'), 19 reason='failing on Windows on PyPy (#63)') 20 21 22def fails(*args, **kwargs): 23 0/0 24 25 26def test_deprecation(recwarn, monkeypatch): 27 execnet.PopenGateway().exit() 28 assert recwarn.pop(DeprecationWarning) 29 monkeypatch.setattr(socket, 'socket', fails) 30 py.test.raises(Exception, 'execnet.SocketGateway("localhost", 8811)') 31 assert recwarn.pop(DeprecationWarning) 32 monkeypatch.setattr(subprocess, 'Popen', fails) 33 py.test.raises(Exception, 'execnet.SshGateway("not-existing")') 34 assert recwarn.pop(DeprecationWarning) 35 36 37class TestBasicGateway: 38 def test_correct_setup(self, gw): 39 assert gw.hasreceiver() 40 assert gw in gw._group 41 assert gw.id in gw._group 42 assert gw.spec 43 44 def test_repr_doesnt_crash(self, gw): 45 assert isinstance(repr(gw), str) 46 47 def test_attribute__name__(self, gw): 48 channel = gw.remote_exec("channel.send(__name__)") 49 name = channel.receive() 50 assert name == "__channelexec__" 51 52 def test_gateway_status_simple(self, gw): 53 status = gw.remote_status() 54 assert status.numexecuting == 0 55 56 def test_exc_info_is_clear_after_gateway_startup(self, gw): 57 ch = gw.remote_exec(""" 58 import traceback, sys 59 excinfo = sys.exc_info() 60 if excinfo != (None, None, None): 61 r = traceback.format_exception(*excinfo) 62 else: 63 r = 0 64 channel.send(r) 65 """) 66 res = ch.receive() 67 if res != 0: 68 pytest.fail("remote raised\n%s" % res) 69 70 def test_gateway_status_no_real_channel(self, gw): 71 numchan = gw._channelfactory.channels() 72 gw.remote_status() 73 numchan2 = gw._channelfactory.channels() 74 # note that on CPython this can not really 75 # fail because refcounting leads to immediate 76 # closure of temporary channels 77 assert numchan2 == numchan 78 79 def test_gateway_status_busy(self, gw): 80 numchannels = gw.remote_status().numchannels 81 ch1 = gw.remote_exec("channel.send(1); channel.receive()") 82 ch2 = gw.remote_exec("channel.receive()") 83 ch1.receive() 84 status = gw.remote_status() 85 assert status.numexecuting == 2 # number of active execution threads 86 assert status.numchannels == numchannels + 2 87 ch1.send(None) 88 ch2.send(None) 89 ch1.waitclose() 90 ch2.waitclose() 91 for i in range(10): 92 status = gw.remote_status() 93 if status.numexecuting == 0: 94 break 95 else: 96 pytest.fail("did not get correct remote status") 97 # race condition 98 assert status.numchannels <= numchannels 99 100 def test_remote_exec_module(self, tmpdir, gw): 101 p = tmpdir.join("remotetest.py") 102 p.write("channel.send(1)") 103 mod = type(os)("remotetest") 104 mod.__file__ = str(p) 105 channel = gw.remote_exec(mod) 106 name = channel.receive() 107 assert name == 1 108 p.write("channel.send(2)") 109 channel = gw.remote_exec(mod) 110 name = channel.receive() 111 assert name == 2 112 113 def test_correct_setup_no_py(self, gw): 114 channel = gw.remote_exec(""" 115 import sys 116 channel.send(list(sys.modules)) 117 """) 118 remotemodules = channel.receive() 119 assert 'py' not in remotemodules, ( 120 "py should not be imported on remote side") 121 122 def test_remote_exec_waitclose(self, gw): 123 channel = gw.remote_exec('pass') 124 channel.waitclose(TESTTIMEOUT) 125 126 def test_remote_exec_waitclose_2(self, gw): 127 channel = gw.remote_exec('def gccycle(): pass') 128 channel.waitclose(TESTTIMEOUT) 129 130 def test_remote_exec_waitclose_noarg(self, gw): 131 channel = gw.remote_exec('pass') 132 channel.waitclose() 133 134 def test_remote_exec_error_after_close(self, gw): 135 channel = gw.remote_exec('pass') 136 channel.waitclose(TESTTIMEOUT) 137 py.test.raises(IOError, channel.send, 0) 138 139 def test_remote_exec_no_explicit_close(self, gw): 140 channel = gw.remote_exec('channel.close()') 141 with pytest.raises(channel.RemoteError) as excinfo: 142 channel.waitclose(TESTTIMEOUT) 143 assert "explicit" in excinfo.value.formatted 144 145 def test_remote_exec_channel_anonymous(self, gw): 146 channel = gw.remote_exec(''' 147 obj = channel.receive() 148 channel.send(obj) 149 ''') 150 channel.send(42) 151 result = channel.receive() 152 assert result == 42 153 154 @needs_osdup 155 def test_confusion_from_os_write_stdout(self, gw): 156 channel = gw.remote_exec(""" 157 import os 158 os.write(1, 'confusion!'.encode('ascii')) 159 channel.send(channel.receive() * 6) 160 channel.send(channel.receive() * 6) 161 """) 162 channel.send(3) 163 res = channel.receive() 164 assert res == 18 165 channel.send(7) 166 res = channel.receive() 167 assert res == 42 168 169 @needs_osdup 170 def test_confusion_from_os_write_stderr(self, gw): 171 channel = gw.remote_exec(""" 172 import os 173 os.write(2, 'test'.encode('ascii')) 174 channel.send(channel.receive() * 6) 175 channel.send(channel.receive() * 6) 176 """) 177 channel.send(3) 178 res = channel.receive() 179 assert res == 18 180 channel.send(7) 181 res = channel.receive() 182 assert res == 42 183 184 def test__rinfo(self, gw): 185 rinfo = gw._rinfo() 186 assert rinfo.executable 187 assert rinfo.cwd 188 assert rinfo.version_info 189 assert repr(rinfo) 190 old = gw.remote_exec(""" 191 import os.path 192 cwd = os.getcwd() 193 channel.send(os.path.basename(cwd)) 194 os.chdir('..') 195 """).receive() 196 try: 197 rinfo2 = gw._rinfo() 198 assert rinfo2.cwd == rinfo.cwd 199 rinfo3 = gw._rinfo(update=True) 200 assert rinfo3.cwd != rinfo2.cwd 201 finally: 202 gw._cache_rinfo = rinfo 203 gw.remote_exec("import os ; os.chdir(%r)" % old).waitclose() 204 205 206class TestPopenGateway: 207 gwtype = 'popen' 208 209 def test_chdir_separation(self, tmpdir, makegateway): 210 old = tmpdir.chdir() 211 try: 212 gw = makegateway('popen') 213 finally: 214 waschangedir = old.chdir() 215 c = gw.remote_exec("import os ; channel.send(os.getcwd())") 216 x = c.receive() 217 assert x.lower() == str(waschangedir).lower() 218 219 def test_remoteerror_readable_traceback(self, gw): 220 with pytest.raises(gateway_base.RemoteError) as e: 221 gw.remote_exec("x y").waitclose() 222 assert "gateway_base" in e.value.formatted 223 224 def test_many_popen(self, makegateway): 225 num = 4 226 l = [] 227 for i in range(num): 228 l.append(makegateway('popen')) 229 channels = [] 230 for gw in l: 231 channel = gw.remote_exec("""channel.send(42)""") 232 channels.append(channel) 233 while channels: 234 channel = channels.pop() 235 ret = channel.receive() 236 assert ret == 42 237 238 def test_rinfo_popen(self, gw): 239 rinfo = gw._rinfo() 240 assert rinfo.executable == sys.executable 241 assert rinfo.cwd == os.getcwd() 242 assert rinfo.version_info == sys.version_info 243 244 def test_waitclose_on_remote_killed(self, makegateway): 245 gw = makegateway('popen') 246 channel = gw.remote_exec(""" 247 import os 248 import time 249 channel.send(os.getpid()) 250 time.sleep(100) 251 """) 252 remotepid = channel.receive() 253 py.process.kill(remotepid) 254 with pytest.raises(EOFError): 255 channel.waitclose(TESTTIMEOUT) 256 with pytest.raises(IOError): 257 channel.send(None) 258 with pytest.raises(EOFError): 259 channel.receive() 260 261 def test_receive_on_remote_sysexit(self, gw): 262 channel = gw.remote_exec(""" 263 raise SystemExit() 264 """) 265 py.test.raises(channel.RemoteError, channel.receive) 266 267 def test_dont_write_bytecode(self, makegateway): 268 check_sys_dont_write_bytecode = """ 269 import sys 270 channel.send(sys.dont_write_bytecode) 271 """ 272 273 gw = makegateway('popen') 274 channel = gw.remote_exec(check_sys_dont_write_bytecode) 275 ret = channel.receive() 276 assert not ret 277 gw = makegateway('popen//dont_write_bytecode') 278 channel = gw.remote_exec(check_sys_dont_write_bytecode) 279 ret = channel.receive() 280 assert ret 281 282 283@py.test.mark.skipif("config.option.broken_isp") 284def test_socket_gw_host_not_found(gw, makegateway): 285 py.test.raises( 286 execnet.HostNotFound, lambda: 287 makegateway("socket=qwepoipqwe:9000")) 288 289 290class TestSshPopenGateway: 291 gwtype = "ssh" 292 293 def test_sshconfig_config_parsing(self, monkeypatch, makegateway): 294 l = [] 295 monkeypatch.setattr( 296 gateway_io, "Popen2IOMaster", 297 lambda *args, **kwargs: l.append(args[0])) 298 py.test.raises( 299 AttributeError, lambda: 300 makegateway("ssh=xyz//ssh_config=qwe")) 301 302 assert len(l) == 1 303 popen_args = l[0] 304 i = popen_args.index('-F') 305 assert popen_args[i+1] == "qwe" 306 307 def test_sshaddress(self, gw, specssh): 308 assert gw.remoteaddress == specssh.ssh 309 310 def test_host_not_found(self, gw, makegateway): 311 py.test.raises( 312 execnet.HostNotFound, lambda: 313 makegateway('ssh=nowhere.codespeak.net')) 314 315 316class TestThreads: 317 def test_threads(self, makegateway): 318 gw = makegateway('popen') 319 gw.remote_init_threads(3) 320 c1 = gw.remote_exec("channel.send(channel.receive())") 321 c2 = gw.remote_exec("channel.send(channel.receive())") 322 c2.send(1) 323 res = c2.receive() 324 assert res == 1 325 c1.send(42) 326 res = c1.receive() 327 assert res == 42 328 329 def test_threads_race_sending(self, makegateway): 330 # multiple threads sending data in parallel 331 gw = makegateway("popen") 332 num = 5 333 gw.remote_init_threads(num) 334 print("remote_init_threads(%d)" % num) 335 channels = [] 336 for x in range(num): 337 ch = gw.remote_exec(""" 338 for x in range(10): 339 channel.send(''*1000) 340 channel.receive() 341 """) 342 channels.append(ch) 343 for ch in channels: 344 for x in range(10): 345 ch.receive(TESTTIMEOUT) 346 ch.send(1) 347 for ch in channels: 348 ch.waitclose(TESTTIMEOUT) 349 350 def test_status_with_threads(self, makegateway): 351 gw = makegateway('popen') 352 c1 = gw.remote_exec("channel.send(1) ; channel.receive()") 353 c2 = gw.remote_exec("channel.send(2) ; channel.receive()") 354 c1.receive() 355 c2.receive() 356 rstatus = gw.remote_status() 357 assert rstatus.numexecuting == 2 358 c1.send(1) 359 c2.send(1) 360 c1.waitclose() 361 c2.waitclose() 362 # there is a slight chance that an execution thread 363 # is still active although it's accompanying channel 364 # is already closed. 365 for i in range(10): 366 rstatus = gw.remote_status() 367 if rstatus.numexecuting == 0: 368 return 369 assert 0, "numexecuting didn't drop to zero" 370 371 372class TestTracing: 373 def test_popen_filetracing(self, testdir, monkeypatch, makegateway): 374 tmpdir = testdir.tmpdir 375 monkeypatch.setenv("TMP", str(tmpdir)) 376 monkeypatch.setenv("TEMP", str(tmpdir)) # windows 377 monkeypatch.setenv('EXECNET_DEBUG', "1") 378 gw = makegateway("popen") 379 # hack out the debuffilename 380 fn = gw.remote_exec( 381 "import execnet;channel.send(execnet.gateway_base.fn)" 382 ).receive() 383 slavefile = py.path.local(fn) 384 assert slavefile.check() 385 slave_line = "creating slavegateway" 386 for line in slavefile.readlines(): 387 if slave_line in line: 388 break 389 else: 390 py.test.fail("did not find {!r} in tracefile".format(slave_line)) 391 gw.exit() 392 393 @skip_win_pypy 394 def test_popen_stderr_tracing(self, capfd, monkeypatch, makegateway): 395 monkeypatch.setenv('EXECNET_DEBUG', "2") 396 gw = makegateway("popen") 397 pid = gw.remote_exec("import os ; channel.send(os.getpid())").receive() 398 out, err = capfd.readouterr() 399 slave_line = "[%s] creating slavegateway" % pid 400 assert slave_line in err 401 gw.exit() 402 403 def test_no_tracing_by_default(self): 404 assert gateway_base.trace == gateway_base.notrace, \ 405 "trace does not to default to empty tracing" 406 407 408class TestStringCoerce: 409 @py.test.mark.skipif('sys.version>="3.0"') 410 def test_2to3(self, makegateway): 411 python = _find_version('3') 412 gw = makegateway('popen//python=%s' % python) 413 ch = gw.remote_exec('channel.send(channel.receive())') 414 ch.send('a') 415 res = ch.receive() 416 assert isinstance(res, unicode) 417 418 gw.reconfigure(py3str_as_py2str=True) 419 420 ch = gw.remote_exec('channel.send(channel.receive())') 421 ch.send('a') 422 res = ch.receive() 423 assert isinstance(res, str) 424 gw.exit() 425 426 @py.test.mark.skipif('sys.version<"3.0"') 427 def test_3to2(self, makegateway): 428 python = _find_version('2') 429 gw = makegateway('popen//python=%s' % python) 430 431 ch = gw.remote_exec('channel.send(channel.receive())') 432 ch.send(bytes('a', 'ascii')) 433 res = ch.receive() 434 assert isinstance(res, str) 435 436 gw.reconfigure(py3str_as_py2str=True, py2str_as_py3str=False) 437 438 ch = gw.remote_exec('channel.send(channel.receive())') 439 ch.send('a') 440 res = ch.receive() 441 assert isinstance(res, bytes) 442 gw.exit() 443 444 445@pytest.mark.parametrize('spec, expected_args', [ 446 ('popen//python=python', ['python']), 447 ('popen//python=sudo -u test python', ['sudo', '-u', 'test', 'python']), 448 pytest.param(r'popen//python=/hans\ alt/bin/python', ['/hans alt/bin/python'], 449 marks=pytest.mark.skipif(sys.platform.startswith('win'), reason='invalid spec on Windows')), 450 ('popen//python="/u/test me/python" -e', ['/u/test me/python', '-e']), 451]) 452def test_popen_args(spec, expected_args): 453 expected_args = expected_args + [ 454 '-u', '-c', gateway_io.popen_bootstrapline] 455 args = gateway_io.popen_args(execnet.XSpec(spec)) 456 assert args == expected_args 457