1from __future__ import with_statement
2
3import os, sys
4import py
5
6needsdup = py.test.mark.skipif("not hasattr(os, 'dup')")
7
8from py.builtin import print_
9
10if sys.version_info >= (3,0):
11    def tobytes(obj):
12        if isinstance(obj, str):
13            obj = obj.encode('UTF-8')
14        assert isinstance(obj, bytes)
15        return obj
16    def totext(obj):
17        if isinstance(obj, bytes):
18            obj = str(obj, 'UTF-8')
19        assert isinstance(obj, str)
20        return obj
21else:
22    def tobytes(obj):
23        if isinstance(obj, unicode):
24            obj = obj.encode('UTF-8')
25        assert isinstance(obj, str)
26        return obj
27    def totext(obj):
28        if isinstance(obj, str):
29            obj = unicode(obj, 'UTF-8')
30        assert isinstance(obj, unicode)
31        return obj
32
33def oswritebytes(fd, obj):
34    os.write(fd, tobytes(obj))
35
36class TestTextIO:
37    def test_text(self):
38        f = py.io.TextIO()
39        f.write("hello")
40        s = f.getvalue()
41        assert s == "hello"
42        f.close()
43
44    def test_unicode_and_str_mixture(self):
45        f = py.io.TextIO()
46        if sys.version_info >= (3,0):
47            f.write("\u00f6")
48            py.test.raises(TypeError, "f.write(bytes('hello', 'UTF-8'))")
49        else:
50            f.write(unicode("\u00f6", 'UTF-8'))
51            f.write("hello") # bytes
52            s = f.getvalue()
53            f.close()
54            assert isinstance(s, unicode)
55
56def test_bytes_io():
57    f = py.io.BytesIO()
58    f.write(tobytes("hello"))
59    py.test.raises(TypeError, "f.write(totext('hello'))")
60    s = f.getvalue()
61    assert s == tobytes("hello")
62
63def test_dontreadfrominput():
64    from py._io.capture import DontReadFromInput
65    f = DontReadFromInput()
66    assert not f.isatty()
67    py.test.raises(IOError, f.read)
68    py.test.raises(IOError, f.readlines)
69    py.test.raises(IOError, iter, f)
70    py.test.raises(ValueError, f.fileno)
71    f.close() # just for completeness
72
73def pytest_funcarg__tmpfile(request):
74    testdir = request.getfuncargvalue("testdir")
75    f = testdir.makepyfile("").open('wb+')
76    request.addfinalizer(f.close)
77    return f
78
79@needsdup
80def test_dupfile(tmpfile):
81    flist = []
82    for i in range(5):
83        nf = py.io.dupfile(tmpfile, encoding="utf-8")
84        assert nf != tmpfile
85        assert nf.fileno() != tmpfile.fileno()
86        assert nf not in flist
87        print_(i, end="", file=nf)
88        flist.append(nf)
89    for i in range(5):
90        f = flist[i]
91        f.close()
92    tmpfile.seek(0)
93    s = tmpfile.read()
94    assert "01234" in repr(s)
95    tmpfile.close()
96
97def test_dupfile_no_mode():
98    """
99    dupfile should trap an AttributeError and return f if no mode is supplied.
100    """
101    class SomeFileWrapper(object):
102        "An object with a fileno method but no mode attribute"
103        def fileno(self):
104            return 1
105    tmpfile = SomeFileWrapper()
106    assert py.io.dupfile(tmpfile) is tmpfile
107    with py.test.raises(AttributeError):
108        py.io.dupfile(tmpfile, raising=True)
109
110def lsof_check(func):
111    pid = os.getpid()
112    try:
113        out = py.process.cmdexec("lsof -p %d" % pid)
114    except py.process.cmdexec.Error:
115        py.test.skip("could not run 'lsof'")
116    func()
117    out2 = py.process.cmdexec("lsof -p %d" % pid)
118    len1 = len([x for x in out.split("\n") if "REG" in x])
119    len2 = len([x for x in out2.split("\n") if "REG" in x])
120    assert len2 < len1 + 3, out2
121
122class TestFDCapture:
123    pytestmark = needsdup
124
125    def test_not_now(self, tmpfile):
126        fd = tmpfile.fileno()
127        cap = py.io.FDCapture(fd, now=False)
128        data = tobytes("hello")
129        os.write(fd, data)
130        f = cap.done()
131        s = f.read()
132        assert not s
133        cap = py.io.FDCapture(fd, now=False)
134        cap.start()
135        os.write(fd, data)
136        f = cap.done()
137        s = f.read()
138        assert s == "hello"
139
140    def test_simple(self, tmpfile):
141        fd = tmpfile.fileno()
142        cap = py.io.FDCapture(fd)
143        data = tobytes("hello")
144        os.write(fd, data)
145        f = cap.done()
146        s = f.read()
147        assert s == "hello"
148        f.close()
149
150    def test_simple_many(self, tmpfile):
151        for i in range(10):
152            self.test_simple(tmpfile)
153
154    def test_simple_many_check_open_files(self, tmpfile):
155        lsof_check(lambda: self.test_simple_many(tmpfile))
156
157    def test_simple_fail_second_start(self, tmpfile):
158        fd = tmpfile.fileno()
159        cap = py.io.FDCapture(fd)
160        f = cap.done()
161        py.test.raises(ValueError, cap.start)
162        f.close()
163
164    def test_stderr(self):
165        cap = py.io.FDCapture(2, patchsys=True)
166        print_("hello", file=sys.stderr)
167        f = cap.done()
168        s = f.read()
169        assert s == "hello\n"
170
171    def test_stdin(self, tmpfile):
172        tmpfile.write(tobytes("3"))
173        tmpfile.seek(0)
174        cap = py.io.FDCapture(0, tmpfile=tmpfile)
175        # check with os.read() directly instead of raw_input(), because
176        # sys.stdin itself may be redirected (as py.test now does by default)
177        x = os.read(0, 100).strip()
178        f = cap.done()
179        assert x == tobytes("3")
180
181    def test_writeorg(self, tmpfile):
182        data1, data2 = tobytes("foo"), tobytes("bar")
183        try:
184            cap = py.io.FDCapture(tmpfile.fileno())
185            tmpfile.write(data1)
186            cap.writeorg(data2)
187        finally:
188            tmpfile.close()
189        f = cap.done()
190        scap = f.read()
191        assert scap == totext(data1)
192        stmp = open(tmpfile.name, 'rb').read()
193        assert stmp == data2
194
195
196class TestStdCapture:
197    def getcapture(self, **kw):
198        return py.io.StdCapture(**kw)
199
200    def test_capturing_done_simple(self):
201        cap = self.getcapture()
202        sys.stdout.write("hello")
203        sys.stderr.write("world")
204        outfile, errfile = cap.done()
205        s = outfile.read()
206        assert s == "hello"
207        s = errfile.read()
208        assert s == "world"
209
210    def test_capturing_reset_simple(self):
211        cap = self.getcapture()
212        print("hello world")
213        sys.stderr.write("hello error\n")
214        out, err = cap.reset()
215        assert out == "hello world\n"
216        assert err == "hello error\n"
217
218    def test_capturing_readouterr(self):
219        cap = self.getcapture()
220        try:
221            print ("hello world")
222            sys.stderr.write("hello error\n")
223            out, err = cap.readouterr()
224            assert out == "hello world\n"
225            assert err == "hello error\n"
226            sys.stderr.write("error2")
227        finally:
228            out, err = cap.reset()
229        assert err == "error2"
230
231    def test_capturing_readouterr_unicode(self):
232        cap = self.getcapture()
233        print ("hx\xc4\x85\xc4\x87")
234        out, err = cap.readouterr()
235        assert out == py.builtin._totext("hx\xc4\x85\xc4\x87\n", "utf8")
236
237    @py.test.mark.skipif('sys.version_info >= (3,)',
238                      reason='text output different for bytes on python3')
239    def test_capturing_readouterr_decode_error_handling(self):
240        cap = self.getcapture()
241        # triggered a internal error in pytest
242        print('\xa6')
243        out, err = cap.readouterr()
244        assert out == py.builtin._totext('\ufffd\n', 'unicode-escape')
245
246    def test_capturing_mixed(self):
247        cap = self.getcapture(mixed=True)
248        sys.stdout.write("hello ")
249        sys.stderr.write("world")
250        sys.stdout.write(".")
251        out, err = cap.reset()
252        assert out.strip() == "hello world."
253        assert not err
254
255    def test_reset_twice_error(self):
256        cap = self.getcapture()
257        print ("hello")
258        out, err = cap.reset()
259        py.test.raises(ValueError, cap.reset)
260        assert out == "hello\n"
261        assert not err
262
263    def test_capturing_modify_sysouterr_in_between(self):
264        oldout = sys.stdout
265        olderr = sys.stderr
266        cap = self.getcapture()
267        sys.stdout.write("hello")
268        sys.stderr.write("world")
269        sys.stdout = py.io.TextIO()
270        sys.stderr = py.io.TextIO()
271        print ("not seen")
272        sys.stderr.write("not seen\n")
273        out, err = cap.reset()
274        assert out == "hello"
275        assert err == "world"
276        assert sys.stdout == oldout
277        assert sys.stderr == olderr
278
279    def test_capturing_error_recursive(self):
280        cap1 = self.getcapture()
281        print ("cap1")
282        cap2 = self.getcapture()
283        print ("cap2")
284        out2, err2 = cap2.reset()
285        out1, err1 = cap1.reset()
286        assert out1 == "cap1\n"
287        assert out2 == "cap2\n"
288
289    def test_just_out_capture(self):
290        cap = self.getcapture(out=True, err=False)
291        sys.stdout.write("hello")
292        sys.stderr.write("world")
293        out, err = cap.reset()
294        assert out == "hello"
295        assert not err
296
297    def test_just_err_capture(self):
298        cap = self.getcapture(out=False, err=True)
299        sys.stdout.write("hello")
300        sys.stderr.write("world")
301        out, err = cap.reset()
302        assert err == "world"
303        assert not out
304
305    def test_stdin_restored(self):
306        old = sys.stdin
307        cap = self.getcapture(in_=True)
308        newstdin = sys.stdin
309        out, err = cap.reset()
310        assert newstdin != sys.stdin
311        assert sys.stdin is old
312
313    def test_stdin_nulled_by_default(self):
314        print ("XXX this test may well hang instead of crashing")
315        print ("XXX which indicates an error in the underlying capturing")
316        print ("XXX mechanisms")
317        cap = self.getcapture()
318        py.test.raises(IOError, "sys.stdin.read()")
319        out, err = cap.reset()
320
321    def test_suspend_resume(self):
322        cap = self.getcapture(out=True, err=False, in_=False)
323        try:
324            print ("hello")
325            sys.stderr.write("error\n")
326            out, err = cap.suspend()
327            assert out == "hello\n"
328            assert not err
329            print ("in between")
330            sys.stderr.write("in between\n")
331            cap.resume()
332            print ("after")
333            sys.stderr.write("error_after\n")
334        finally:
335            out, err = cap.reset()
336        assert out == "after\n"
337        assert not err
338
339class TestStdCaptureNotNow(TestStdCapture):
340    def getcapture(self, **kw):
341        kw['now'] = False
342        cap = py.io.StdCapture(**kw)
343        cap.startall()
344        return cap
345
346class TestStdCaptureFD(TestStdCapture):
347    pytestmark = needsdup
348
349    def getcapture(self, **kw):
350        return py.io.StdCaptureFD(**kw)
351
352    def test_intermingling(self):
353        cap = self.getcapture()
354        oswritebytes(1, "1")
355        sys.stdout.write(str(2))
356        sys.stdout.flush()
357        oswritebytes(1, "3")
358        oswritebytes(2, "a")
359        sys.stderr.write("b")
360        sys.stderr.flush()
361        oswritebytes(2, "c")
362        out, err = cap.reset()
363        assert out == "123"
364        assert err == "abc"
365
366    def test_callcapture(self):
367        def func(x, y):
368            print (x)
369            sys.stderr.write(str(y))
370            return 42
371
372        res, out, err = py.io.StdCaptureFD.call(func, 3, y=4)
373        assert res == 42
374        assert out.startswith("3")
375        assert err.startswith("4")
376
377    def test_many(self, capfd):
378        def f():
379            for i in range(10):
380                cap = py.io.StdCaptureFD()
381                cap.reset()
382        lsof_check(f)
383
384class TestStdCaptureFDNotNow(TestStdCaptureFD):
385    pytestmark = needsdup
386
387    def getcapture(self, **kw):
388        kw['now'] = False
389        cap = py.io.StdCaptureFD(**kw)
390        cap.startall()
391        return cap
392
393@needsdup
394def test_stdcapture_fd_tmpfile(tmpfile):
395    capfd = py.io.StdCaptureFD(out=tmpfile)
396    os.write(1, "hello".encode("ascii"))
397    os.write(2, "world".encode("ascii"))
398    outf, errf = capfd.done()
399    assert outf == tmpfile
400
401class TestStdCaptureFDinvalidFD:
402    pytestmark = needsdup
403    def test_stdcapture_fd_invalid_fd(self, testdir):
404        testdir.makepyfile("""
405            import py, os
406            def test_stdout():
407                os.close(1)
408                cap = py.io.StdCaptureFD(out=True, err=False, in_=False)
409                cap.done()
410            def test_stderr():
411                os.close(2)
412                cap = py.io.StdCaptureFD(out=False, err=True, in_=False)
413                cap.done()
414            def test_stdin():
415                os.close(0)
416                cap = py.io.StdCaptureFD(out=False, err=False, in_=True)
417                cap.done()
418        """)
419        result = testdir.runpytest("--capture=fd")
420        assert result.ret == 0
421        assert result.parseoutcomes()['passed'] == 3
422
423def test_capture_not_started_but_reset():
424    capsys = py.io.StdCapture(now=False)
425    capsys.done()
426    capsys.done()
427    capsys.reset()
428
429@needsdup
430def test_capture_no_sys():
431    capsys = py.io.StdCapture()
432    try:
433        cap = py.io.StdCaptureFD(patchsys=False)
434        sys.stdout.write("hello")
435        sys.stderr.write("world")
436        oswritebytes(1, "1")
437        oswritebytes(2, "2")
438        out, err = cap.reset()
439        assert out == "1"
440        assert err == "2"
441    finally:
442        capsys.reset()
443
444@needsdup
445def test_callcapture_nofd():
446    def func(x, y):
447        oswritebytes(1, "hello")
448        oswritebytes(2, "hello")
449        print (x)
450        sys.stderr.write(str(y))
451        return 42
452
453    capfd = py.io.StdCaptureFD(patchsys=False)
454    try:
455        res, out, err = py.io.StdCapture.call(func, 3, y=4)
456    finally:
457        capfd.reset()
458    assert res == 42
459    assert out.startswith("3")
460    assert err.startswith("4")
461
462@needsdup
463@py.test.mark.parametrize('use', [True, False])
464def test_fdcapture_tmpfile_remains_the_same(tmpfile, use):
465    if not use:
466        tmpfile = True
467    cap = py.io.StdCaptureFD(out=False, err=tmpfile, now=False)
468    cap.startall()
469    capfile = cap.err.tmpfile
470    cap.suspend()
471    cap.resume()
472    capfile2 = cap.err.tmpfile
473    assert capfile2 == capfile
474
475@py.test.mark.parametrize('method', ['StdCapture', 'StdCaptureFD'])
476def test_capturing_and_logging_fundamentals(testdir, method):
477    if method == "StdCaptureFD" and not hasattr(os, 'dup'):
478        py.test.skip("need os.dup")
479    # here we check a fundamental feature
480    p = testdir.makepyfile("""
481        import sys, os
482        import py, logging
483        cap = py.io.%s(out=False, in_=False)
484
485        logging.warn("hello1")
486        outerr = cap.suspend()
487        print ("suspend, captured %%s" %%(outerr,))
488        logging.warn("hello2")
489
490        cap.resume()
491        logging.warn("hello3")
492
493        outerr = cap.suspend()
494        print ("suspend2, captured %%s" %% (outerr,))
495    """ % (method,))
496    result = testdir.runpython(p)
497    result.stdout.fnmatch_lines([
498        "suspend, captured*hello1*",
499        "suspend2, captured*hello2*WARNING:root:hello3*",
500    ])
501    assert "atexit" not in result.stderr.str()
502