1"""
2mostly functional tests of gateways.
3"""
4import os
5import py
6import pytest
7import socket
8import subprocess
9import sys
10
11import execnet
12from execnet import gateway_base, gateway_io
13from test_serializer import _find_version
14TESTTIMEOUT = 10.0  # seconds
15needs_osdup = py.test.mark.skipif("not hasattr(os, 'dup')")
16
17
18skip_win_pypy = pytest.mark.xfail(condition=hasattr(sys, 'pypy_version_info') and sys.platform.startswith('win'),
19                                  reason='failing on Windows on PyPy (#63)')
20
21
22def fails(*args, **kwargs):
23    0/0
24
25
26def test_deprecation(recwarn, monkeypatch):
27    execnet.PopenGateway().exit()
28    assert recwarn.pop(DeprecationWarning)
29    monkeypatch.setattr(socket, 'socket', fails)
30    py.test.raises(Exception, 'execnet.SocketGateway("localhost", 8811)')
31    assert recwarn.pop(DeprecationWarning)
32    monkeypatch.setattr(subprocess, 'Popen', fails)
33    py.test.raises(Exception, 'execnet.SshGateway("not-existing")')
34    assert recwarn.pop(DeprecationWarning)
35
36
37class TestBasicGateway:
38    def test_correct_setup(self, gw):
39        assert gw.hasreceiver()
40        assert gw in gw._group
41        assert gw.id in gw._group
42        assert gw.spec
43
44    def test_repr_doesnt_crash(self, gw):
45        assert isinstance(repr(gw), str)
46
47    def test_attribute__name__(self, gw):
48        channel = gw.remote_exec("channel.send(__name__)")
49        name = channel.receive()
50        assert name == "__channelexec__"
51
52    def test_gateway_status_simple(self, gw):
53        status = gw.remote_status()
54        assert status.numexecuting == 0
55
56    def test_exc_info_is_clear_after_gateway_startup(self, gw):
57        ch = gw.remote_exec("""
58                import traceback, sys
59                excinfo = sys.exc_info()
60                if excinfo != (None, None, None):
61                    r = traceback.format_exception(*excinfo)
62                else:
63                    r = 0
64                channel.send(r)
65        """)
66        res = ch.receive()
67        if res != 0:
68            pytest.fail("remote raised\n%s" % res)
69
70    def test_gateway_status_no_real_channel(self, gw):
71        numchan = gw._channelfactory.channels()
72        gw.remote_status()
73        numchan2 = gw._channelfactory.channels()
74        # note that on CPython this can not really
75        # fail because refcounting leads to immediate
76        # closure of temporary channels
77        assert numchan2 == numchan
78
79    def test_gateway_status_busy(self, gw):
80        numchannels = gw.remote_status().numchannels
81        ch1 = gw.remote_exec("channel.send(1); channel.receive()")
82        ch2 = gw.remote_exec("channel.receive()")
83        ch1.receive()
84        status = gw.remote_status()
85        assert status.numexecuting == 2  # number of active execution threads
86        assert status.numchannels == numchannels + 2
87        ch1.send(None)
88        ch2.send(None)
89        ch1.waitclose()
90        ch2.waitclose()
91        for i in range(10):
92            status = gw.remote_status()
93            if status.numexecuting == 0:
94                break
95        else:
96            pytest.fail("did not get correct remote status")
97        # race condition
98        assert status.numchannels <= numchannels
99
100    def test_remote_exec_module(self, tmpdir, gw):
101        p = tmpdir.join("remotetest.py")
102        p.write("channel.send(1)")
103        mod = type(os)("remotetest")
104        mod.__file__ = str(p)
105        channel = gw.remote_exec(mod)
106        name = channel.receive()
107        assert name == 1
108        p.write("channel.send(2)")
109        channel = gw.remote_exec(mod)
110        name = channel.receive()
111        assert name == 2
112
113    def test_correct_setup_no_py(self, gw):
114        channel = gw.remote_exec("""
115            import sys
116            channel.send(list(sys.modules))
117        """)
118        remotemodules = channel.receive()
119        assert 'py' not in remotemodules, (
120            "py should not be imported on remote side")
121
122    def test_remote_exec_waitclose(self, gw):
123        channel = gw.remote_exec('pass')
124        channel.waitclose(TESTTIMEOUT)
125
126    def test_remote_exec_waitclose_2(self, gw):
127        channel = gw.remote_exec('def gccycle(): pass')
128        channel.waitclose(TESTTIMEOUT)
129
130    def test_remote_exec_waitclose_noarg(self, gw):
131        channel = gw.remote_exec('pass')
132        channel.waitclose()
133
134    def test_remote_exec_error_after_close(self, gw):
135        channel = gw.remote_exec('pass')
136        channel.waitclose(TESTTIMEOUT)
137        py.test.raises(IOError, channel.send, 0)
138
139    def test_remote_exec_no_explicit_close(self, gw):
140        channel = gw.remote_exec('channel.close()')
141        with pytest.raises(channel.RemoteError) as excinfo:
142            channel.waitclose(TESTTIMEOUT)
143        assert "explicit" in excinfo.value.formatted
144
145    def test_remote_exec_channel_anonymous(self, gw):
146        channel = gw.remote_exec('''
147           obj = channel.receive()
148           channel.send(obj)
149        ''')
150        channel.send(42)
151        result = channel.receive()
152        assert result == 42
153
154    @needs_osdup
155    def test_confusion_from_os_write_stdout(self, gw):
156        channel = gw.remote_exec("""
157            import os
158            os.write(1, 'confusion!'.encode('ascii'))
159            channel.send(channel.receive() * 6)
160            channel.send(channel.receive() * 6)
161        """)
162        channel.send(3)
163        res = channel.receive()
164        assert res == 18
165        channel.send(7)
166        res = channel.receive()
167        assert res == 42
168
169    @needs_osdup
170    def test_confusion_from_os_write_stderr(self, gw):
171        channel = gw.remote_exec("""
172            import os
173            os.write(2, 'test'.encode('ascii'))
174            channel.send(channel.receive() * 6)
175            channel.send(channel.receive() * 6)
176        """)
177        channel.send(3)
178        res = channel.receive()
179        assert res == 18
180        channel.send(7)
181        res = channel.receive()
182        assert res == 42
183
184    def test__rinfo(self, gw):
185        rinfo = gw._rinfo()
186        assert rinfo.executable
187        assert rinfo.cwd
188        assert rinfo.version_info
189        assert repr(rinfo)
190        old = gw.remote_exec("""
191            import os.path
192            cwd = os.getcwd()
193            channel.send(os.path.basename(cwd))
194            os.chdir('..')
195        """).receive()
196        try:
197            rinfo2 = gw._rinfo()
198            assert rinfo2.cwd == rinfo.cwd
199            rinfo3 = gw._rinfo(update=True)
200            assert rinfo3.cwd != rinfo2.cwd
201        finally:
202            gw._cache_rinfo = rinfo
203            gw.remote_exec("import os ; os.chdir(%r)" % old).waitclose()
204
205
206class TestPopenGateway:
207    gwtype = 'popen'
208
209    def test_chdir_separation(self, tmpdir, makegateway):
210        old = tmpdir.chdir()
211        try:
212            gw = makegateway('popen')
213        finally:
214            waschangedir = old.chdir()
215        c = gw.remote_exec("import os ; channel.send(os.getcwd())")
216        x = c.receive()
217        assert x.lower() == str(waschangedir).lower()
218
219    def test_remoteerror_readable_traceback(self, gw):
220        with pytest.raises(gateway_base.RemoteError) as e:
221            gw.remote_exec("x y").waitclose()
222        assert "gateway_base" in e.value.formatted
223
224    def test_many_popen(self, makegateway):
225        num = 4
226        l = []
227        for i in range(num):
228            l.append(makegateway('popen'))
229        channels = []
230        for gw in l:
231            channel = gw.remote_exec("""channel.send(42)""")
232            channels.append(channel)
233        while channels:
234            channel = channels.pop()
235            ret = channel.receive()
236            assert ret == 42
237
238    def test_rinfo_popen(self, gw):
239        rinfo = gw._rinfo()
240        assert rinfo.executable == sys.executable
241        assert rinfo.cwd == os.getcwd()
242        assert rinfo.version_info == sys.version_info
243
244    def test_waitclose_on_remote_killed(self, makegateway):
245        gw = makegateway('popen')
246        channel = gw.remote_exec("""
247            import os
248            import time
249            channel.send(os.getpid())
250            time.sleep(100)
251        """)
252        remotepid = channel.receive()
253        py.process.kill(remotepid)
254        with pytest.raises(EOFError):
255            channel.waitclose(TESTTIMEOUT)
256        with pytest.raises(IOError):
257            channel.send(None)
258        with pytest.raises(EOFError):
259            channel.receive()
260
261    def test_receive_on_remote_sysexit(self, gw):
262        channel = gw.remote_exec("""
263            raise SystemExit()
264        """)
265        py.test.raises(channel.RemoteError, channel.receive)
266
267    def test_dont_write_bytecode(self, makegateway):
268        check_sys_dont_write_bytecode = """
269            import sys
270            channel.send(sys.dont_write_bytecode)
271        """
272
273        gw = makegateway('popen')
274        channel = gw.remote_exec(check_sys_dont_write_bytecode)
275        ret = channel.receive()
276        assert not ret
277        gw = makegateway('popen//dont_write_bytecode')
278        channel = gw.remote_exec(check_sys_dont_write_bytecode)
279        ret = channel.receive()
280        assert ret
281
282
283@py.test.mark.skipif("config.option.broken_isp")
284def test_socket_gw_host_not_found(gw, makegateway):
285    py.test.raises(
286        execnet.HostNotFound, lambda:
287            makegateway("socket=qwepoipqwe:9000"))
288
289
290class TestSshPopenGateway:
291    gwtype = "ssh"
292
293    def test_sshconfig_config_parsing(self, monkeypatch, makegateway):
294        l = []
295        monkeypatch.setattr(
296            gateway_io, "Popen2IOMaster",
297            lambda *args, **kwargs: l.append(args[0]))
298        py.test.raises(
299            AttributeError, lambda:
300            makegateway("ssh=xyz//ssh_config=qwe"))
301
302        assert len(l) == 1
303        popen_args = l[0]
304        i = popen_args.index('-F')
305        assert popen_args[i+1] == "qwe"
306
307    def test_sshaddress(self, gw, specssh):
308        assert gw.remoteaddress == specssh.ssh
309
310    def test_host_not_found(self, gw, makegateway):
311        py.test.raises(
312            execnet.HostNotFound, lambda:
313            makegateway('ssh=nowhere.codespeak.net'))
314
315
316class TestThreads:
317    def test_threads(self, makegateway):
318        gw = makegateway('popen')
319        gw.remote_init_threads(3)
320        c1 = gw.remote_exec("channel.send(channel.receive())")
321        c2 = gw.remote_exec("channel.send(channel.receive())")
322        c2.send(1)
323        res = c2.receive()
324        assert res == 1
325        c1.send(42)
326        res = c1.receive()
327        assert res == 42
328
329    def test_threads_race_sending(self, makegateway):
330        # multiple threads sending data in parallel
331        gw = makegateway("popen")
332        num = 5
333        gw.remote_init_threads(num)
334        print("remote_init_threads(%d)" % num)
335        channels = []
336        for x in range(num):
337            ch = gw.remote_exec("""
338                for x in range(10):
339                    channel.send(''*1000)
340                channel.receive()
341            """)
342            channels.append(ch)
343        for ch in channels:
344            for x in range(10):
345                ch.receive(TESTTIMEOUT)
346            ch.send(1)
347        for ch in channels:
348            ch.waitclose(TESTTIMEOUT)
349
350    def test_status_with_threads(self, makegateway):
351        gw = makegateway('popen')
352        c1 = gw.remote_exec("channel.send(1) ; channel.receive()")
353        c2 = gw.remote_exec("channel.send(2) ; channel.receive()")
354        c1.receive()
355        c2.receive()
356        rstatus = gw.remote_status()
357        assert rstatus.numexecuting == 2
358        c1.send(1)
359        c2.send(1)
360        c1.waitclose()
361        c2.waitclose()
362        # there is a slight chance that an execution thread
363        # is still active although it's accompanying channel
364        # is already closed.
365        for i in range(10):
366            rstatus = gw.remote_status()
367            if rstatus.numexecuting == 0:
368                return
369        assert 0, "numexecuting didn't drop to zero"
370
371
372class TestTracing:
373    def test_popen_filetracing(self, testdir, monkeypatch, makegateway):
374        tmpdir = testdir.tmpdir
375        monkeypatch.setenv("TMP", str(tmpdir))
376        monkeypatch.setenv("TEMP", str(tmpdir))  # windows
377        monkeypatch.setenv('EXECNET_DEBUG', "1")
378        gw = makegateway("popen")
379        #  hack out the debuffilename
380        fn = gw.remote_exec(
381            "import execnet;channel.send(execnet.gateway_base.fn)"
382        ).receive()
383        slavefile = py.path.local(fn)
384        assert slavefile.check()
385        slave_line = "creating slavegateway"
386        for line in slavefile.readlines():
387            if slave_line in line:
388                break
389        else:
390            py.test.fail("did not find {!r} in tracefile".format(slave_line))
391        gw.exit()
392
393    @skip_win_pypy
394    def test_popen_stderr_tracing(self, capfd, monkeypatch, makegateway):
395        monkeypatch.setenv('EXECNET_DEBUG', "2")
396        gw = makegateway("popen")
397        pid = gw.remote_exec("import os ; channel.send(os.getpid())").receive()
398        out, err = capfd.readouterr()
399        slave_line = "[%s] creating slavegateway" % pid
400        assert slave_line in err
401        gw.exit()
402
403    def test_no_tracing_by_default(self):
404        assert gateway_base.trace == gateway_base.notrace, \
405            "trace does not to default to empty tracing"
406
407
408class TestStringCoerce:
409    @py.test.mark.skipif('sys.version>="3.0"')
410    def test_2to3(self, makegateway):
411        python = _find_version('3')
412        gw = makegateway('popen//python=%s' % python)
413        ch = gw.remote_exec('channel.send(channel.receive())')
414        ch.send('a')
415        res = ch.receive()
416        assert isinstance(res, unicode)
417
418        gw.reconfigure(py3str_as_py2str=True)
419
420        ch = gw.remote_exec('channel.send(channel.receive())')
421        ch.send('a')
422        res = ch.receive()
423        assert isinstance(res, str)
424        gw.exit()
425
426    @py.test.mark.skipif('sys.version<"3.0"')
427    def test_3to2(self, makegateway):
428        python = _find_version('2')
429        gw = makegateway('popen//python=%s' % python)
430
431        ch = gw.remote_exec('channel.send(channel.receive())')
432        ch.send(bytes('a', 'ascii'))
433        res = ch.receive()
434        assert isinstance(res, str)
435
436        gw.reconfigure(py3str_as_py2str=True, py2str_as_py3str=False)
437
438        ch = gw.remote_exec('channel.send(channel.receive())')
439        ch.send('a')
440        res = ch.receive()
441        assert isinstance(res, bytes)
442        gw.exit()
443
444
445@pytest.mark.parametrize('spec, expected_args', [
446    ('popen//python=python', ['python']),
447    ('popen//python=sudo -u test python', ['sudo', '-u', 'test', 'python']),
448    pytest.param(r'popen//python=/hans\ alt/bin/python', ['/hans alt/bin/python'],
449                 marks=pytest.mark.skipif(sys.platform.startswith('win'), reason='invalid spec on Windows')),
450    ('popen//python="/u/test me/python" -e', ['/u/test me/python', '-e']),
451])
452def test_popen_args(spec, expected_args):
453    expected_args = expected_args + [
454        '-u', '-c', gateway_io.popen_bootstrapline]
455    args = gateway_io.popen_args(execnet.XSpec(spec))
456    assert args == expected_args
457