1from __future__ import with_statement 2 3import os, sys 4import py 5 6needsdup = py.test.mark.skipif("not hasattr(os, 'dup')") 7 8from py.builtin import print_ 9 10if sys.version_info >= (3,0): 11 def tobytes(obj): 12 if isinstance(obj, str): 13 obj = obj.encode('UTF-8') 14 assert isinstance(obj, bytes) 15 return obj 16 def totext(obj): 17 if isinstance(obj, bytes): 18 obj = str(obj, 'UTF-8') 19 assert isinstance(obj, str) 20 return obj 21else: 22 def tobytes(obj): 23 if isinstance(obj, unicode): 24 obj = obj.encode('UTF-8') 25 assert isinstance(obj, str) 26 return obj 27 def totext(obj): 28 if isinstance(obj, str): 29 obj = unicode(obj, 'UTF-8') 30 assert isinstance(obj, unicode) 31 return obj 32 33def oswritebytes(fd, obj): 34 os.write(fd, tobytes(obj)) 35 36class TestTextIO: 37 def test_text(self): 38 f = py.io.TextIO() 39 f.write("hello") 40 s = f.getvalue() 41 assert s == "hello" 42 f.close() 43 44 def test_unicode_and_str_mixture(self): 45 f = py.io.TextIO() 46 if sys.version_info >= (3,0): 47 f.write("\u00f6") 48 py.test.raises(TypeError, "f.write(bytes('hello', 'UTF-8'))") 49 else: 50 f.write(unicode("\u00f6", 'UTF-8')) 51 f.write("hello") # bytes 52 s = f.getvalue() 53 f.close() 54 assert isinstance(s, unicode) 55 56def test_bytes_io(): 57 f = py.io.BytesIO() 58 f.write(tobytes("hello")) 59 py.test.raises(TypeError, "f.write(totext('hello'))") 60 s = f.getvalue() 61 assert s == tobytes("hello") 62 63def test_dontreadfrominput(): 64 from py._io.capture import DontReadFromInput 65 f = DontReadFromInput() 66 assert not f.isatty() 67 py.test.raises(IOError, f.read) 68 py.test.raises(IOError, f.readlines) 69 py.test.raises(IOError, iter, f) 70 py.test.raises(ValueError, f.fileno) 71 f.close() # just for completeness 72 73def pytest_funcarg__tmpfile(request): 74 testdir = request.getfuncargvalue("testdir") 75 f = testdir.makepyfile("").open('wb+') 76 request.addfinalizer(f.close) 77 return f 78 79@needsdup 80def test_dupfile(tmpfile): 81 flist = [] 82 for i in range(5): 83 nf = py.io.dupfile(tmpfile, encoding="utf-8") 84 assert nf != tmpfile 85 assert nf.fileno() != tmpfile.fileno() 86 assert nf not in flist 87 print_(i, end="", file=nf) 88 flist.append(nf) 89 for i in range(5): 90 f = flist[i] 91 f.close() 92 tmpfile.seek(0) 93 s = tmpfile.read() 94 assert "01234" in repr(s) 95 tmpfile.close() 96 97def test_dupfile_no_mode(): 98 """ 99 dupfile should trap an AttributeError and return f if no mode is supplied. 100 """ 101 class SomeFileWrapper(object): 102 "An object with a fileno method but no mode attribute" 103 def fileno(self): 104 return 1 105 tmpfile = SomeFileWrapper() 106 assert py.io.dupfile(tmpfile) is tmpfile 107 with py.test.raises(AttributeError): 108 py.io.dupfile(tmpfile, raising=True) 109 110def lsof_check(func): 111 pid = os.getpid() 112 try: 113 out = py.process.cmdexec("lsof -p %d" % pid) 114 except py.process.cmdexec.Error: 115 py.test.skip("could not run 'lsof'") 116 func() 117 out2 = py.process.cmdexec("lsof -p %d" % pid) 118 len1 = len([x for x in out.split("\n") if "REG" in x]) 119 len2 = len([x for x in out2.split("\n") if "REG" in x]) 120 assert len2 < len1 + 3, out2 121 122class TestFDCapture: 123 pytestmark = needsdup 124 125 def test_not_now(self, tmpfile): 126 fd = tmpfile.fileno() 127 cap = py.io.FDCapture(fd, now=False) 128 data = tobytes("hello") 129 os.write(fd, data) 130 f = cap.done() 131 s = f.read() 132 assert not s 133 cap = py.io.FDCapture(fd, now=False) 134 cap.start() 135 os.write(fd, data) 136 f = cap.done() 137 s = f.read() 138 assert s == "hello" 139 140 def test_simple(self, tmpfile): 141 fd = tmpfile.fileno() 142 cap = py.io.FDCapture(fd) 143 data = tobytes("hello") 144 os.write(fd, data) 145 f = cap.done() 146 s = f.read() 147 assert s == "hello" 148 f.close() 149 150 def test_simple_many(self, tmpfile): 151 for i in range(10): 152 self.test_simple(tmpfile) 153 154 def test_simple_many_check_open_files(self, tmpfile): 155 lsof_check(lambda: self.test_simple_many(tmpfile)) 156 157 def test_simple_fail_second_start(self, tmpfile): 158 fd = tmpfile.fileno() 159 cap = py.io.FDCapture(fd) 160 f = cap.done() 161 py.test.raises(ValueError, cap.start) 162 f.close() 163 164 def test_stderr(self): 165 cap = py.io.FDCapture(2, patchsys=True) 166 print_("hello", file=sys.stderr) 167 f = cap.done() 168 s = f.read() 169 assert s == "hello\n" 170 171 def test_stdin(self, tmpfile): 172 tmpfile.write(tobytes("3")) 173 tmpfile.seek(0) 174 cap = py.io.FDCapture(0, tmpfile=tmpfile) 175 # check with os.read() directly instead of raw_input(), because 176 # sys.stdin itself may be redirected (as py.test now does by default) 177 x = os.read(0, 100).strip() 178 f = cap.done() 179 assert x == tobytes("3") 180 181 def test_writeorg(self, tmpfile): 182 data1, data2 = tobytes("foo"), tobytes("bar") 183 try: 184 cap = py.io.FDCapture(tmpfile.fileno()) 185 tmpfile.write(data1) 186 cap.writeorg(data2) 187 finally: 188 tmpfile.close() 189 f = cap.done() 190 scap = f.read() 191 assert scap == totext(data1) 192 stmp = open(tmpfile.name, 'rb').read() 193 assert stmp == data2 194 195 196class TestStdCapture: 197 def getcapture(self, **kw): 198 return py.io.StdCapture(**kw) 199 200 def test_capturing_done_simple(self): 201 cap = self.getcapture() 202 sys.stdout.write("hello") 203 sys.stderr.write("world") 204 outfile, errfile = cap.done() 205 s = outfile.read() 206 assert s == "hello" 207 s = errfile.read() 208 assert s == "world" 209 210 def test_capturing_reset_simple(self): 211 cap = self.getcapture() 212 print("hello world") 213 sys.stderr.write("hello error\n") 214 out, err = cap.reset() 215 assert out == "hello world\n" 216 assert err == "hello error\n" 217 218 def test_capturing_readouterr(self): 219 cap = self.getcapture() 220 try: 221 print ("hello world") 222 sys.stderr.write("hello error\n") 223 out, err = cap.readouterr() 224 assert out == "hello world\n" 225 assert err == "hello error\n" 226 sys.stderr.write("error2") 227 finally: 228 out, err = cap.reset() 229 assert err == "error2" 230 231 def test_capturing_readouterr_unicode(self): 232 cap = self.getcapture() 233 print ("hx\xc4\x85\xc4\x87") 234 out, err = cap.readouterr() 235 assert out == py.builtin._totext("hx\xc4\x85\xc4\x87\n", "utf8") 236 237 @py.test.mark.skipif('sys.version_info >= (3,)', 238 reason='text output different for bytes on python3') 239 def test_capturing_readouterr_decode_error_handling(self): 240 cap = self.getcapture() 241 # triggered a internal error in pytest 242 print('\xa6') 243 out, err = cap.readouterr() 244 assert out == py.builtin._totext('\ufffd\n', 'unicode-escape') 245 246 def test_capturing_mixed(self): 247 cap = self.getcapture(mixed=True) 248 sys.stdout.write("hello ") 249 sys.stderr.write("world") 250 sys.stdout.write(".") 251 out, err = cap.reset() 252 assert out.strip() == "hello world." 253 assert not err 254 255 def test_reset_twice_error(self): 256 cap = self.getcapture() 257 print ("hello") 258 out, err = cap.reset() 259 py.test.raises(ValueError, cap.reset) 260 assert out == "hello\n" 261 assert not err 262 263 def test_capturing_modify_sysouterr_in_between(self): 264 oldout = sys.stdout 265 olderr = sys.stderr 266 cap = self.getcapture() 267 sys.stdout.write("hello") 268 sys.stderr.write("world") 269 sys.stdout = py.io.TextIO() 270 sys.stderr = py.io.TextIO() 271 print ("not seen") 272 sys.stderr.write("not seen\n") 273 out, err = cap.reset() 274 assert out == "hello" 275 assert err == "world" 276 assert sys.stdout == oldout 277 assert sys.stderr == olderr 278 279 def test_capturing_error_recursive(self): 280 cap1 = self.getcapture() 281 print ("cap1") 282 cap2 = self.getcapture() 283 print ("cap2") 284 out2, err2 = cap2.reset() 285 out1, err1 = cap1.reset() 286 assert out1 == "cap1\n" 287 assert out2 == "cap2\n" 288 289 def test_just_out_capture(self): 290 cap = self.getcapture(out=True, err=False) 291 sys.stdout.write("hello") 292 sys.stderr.write("world") 293 out, err = cap.reset() 294 assert out == "hello" 295 assert not err 296 297 def test_just_err_capture(self): 298 cap = self.getcapture(out=False, err=True) 299 sys.stdout.write("hello") 300 sys.stderr.write("world") 301 out, err = cap.reset() 302 assert err == "world" 303 assert not out 304 305 def test_stdin_restored(self): 306 old = sys.stdin 307 cap = self.getcapture(in_=True) 308 newstdin = sys.stdin 309 out, err = cap.reset() 310 assert newstdin != sys.stdin 311 assert sys.stdin is old 312 313 def test_stdin_nulled_by_default(self): 314 print ("XXX this test may well hang instead of crashing") 315 print ("XXX which indicates an error in the underlying capturing") 316 print ("XXX mechanisms") 317 cap = self.getcapture() 318 py.test.raises(IOError, "sys.stdin.read()") 319 out, err = cap.reset() 320 321 def test_suspend_resume(self): 322 cap = self.getcapture(out=True, err=False, in_=False) 323 try: 324 print ("hello") 325 sys.stderr.write("error\n") 326 out, err = cap.suspend() 327 assert out == "hello\n" 328 assert not err 329 print ("in between") 330 sys.stderr.write("in between\n") 331 cap.resume() 332 print ("after") 333 sys.stderr.write("error_after\n") 334 finally: 335 out, err = cap.reset() 336 assert out == "after\n" 337 assert not err 338 339class TestStdCaptureNotNow(TestStdCapture): 340 def getcapture(self, **kw): 341 kw['now'] = False 342 cap = py.io.StdCapture(**kw) 343 cap.startall() 344 return cap 345 346class TestStdCaptureFD(TestStdCapture): 347 pytestmark = needsdup 348 349 def getcapture(self, **kw): 350 return py.io.StdCaptureFD(**kw) 351 352 def test_intermingling(self): 353 cap = self.getcapture() 354 oswritebytes(1, "1") 355 sys.stdout.write(str(2)) 356 sys.stdout.flush() 357 oswritebytes(1, "3") 358 oswritebytes(2, "a") 359 sys.stderr.write("b") 360 sys.stderr.flush() 361 oswritebytes(2, "c") 362 out, err = cap.reset() 363 assert out == "123" 364 assert err == "abc" 365 366 def test_callcapture(self): 367 def func(x, y): 368 print (x) 369 sys.stderr.write(str(y)) 370 return 42 371 372 res, out, err = py.io.StdCaptureFD.call(func, 3, y=4) 373 assert res == 42 374 assert out.startswith("3") 375 assert err.startswith("4") 376 377 def test_many(self, capfd): 378 def f(): 379 for i in range(10): 380 cap = py.io.StdCaptureFD() 381 cap.reset() 382 lsof_check(f) 383 384class TestStdCaptureFDNotNow(TestStdCaptureFD): 385 pytestmark = needsdup 386 387 def getcapture(self, **kw): 388 kw['now'] = False 389 cap = py.io.StdCaptureFD(**kw) 390 cap.startall() 391 return cap 392 393@needsdup 394def test_stdcapture_fd_tmpfile(tmpfile): 395 capfd = py.io.StdCaptureFD(out=tmpfile) 396 os.write(1, "hello".encode("ascii")) 397 os.write(2, "world".encode("ascii")) 398 outf, errf = capfd.done() 399 assert outf == tmpfile 400 401class TestStdCaptureFDinvalidFD: 402 pytestmark = needsdup 403 def test_stdcapture_fd_invalid_fd(self, testdir): 404 testdir.makepyfile(""" 405 import py, os 406 def test_stdout(): 407 os.close(1) 408 cap = py.io.StdCaptureFD(out=True, err=False, in_=False) 409 cap.done() 410 def test_stderr(): 411 os.close(2) 412 cap = py.io.StdCaptureFD(out=False, err=True, in_=False) 413 cap.done() 414 def test_stdin(): 415 os.close(0) 416 cap = py.io.StdCaptureFD(out=False, err=False, in_=True) 417 cap.done() 418 """) 419 result = testdir.runpytest("--capture=fd") 420 assert result.ret == 0 421 assert result.parseoutcomes()['passed'] == 3 422 423def test_capture_not_started_but_reset(): 424 capsys = py.io.StdCapture(now=False) 425 capsys.done() 426 capsys.done() 427 capsys.reset() 428 429@needsdup 430def test_capture_no_sys(): 431 capsys = py.io.StdCapture() 432 try: 433 cap = py.io.StdCaptureFD(patchsys=False) 434 sys.stdout.write("hello") 435 sys.stderr.write("world") 436 oswritebytes(1, "1") 437 oswritebytes(2, "2") 438 out, err = cap.reset() 439 assert out == "1" 440 assert err == "2" 441 finally: 442 capsys.reset() 443 444@needsdup 445def test_callcapture_nofd(): 446 def func(x, y): 447 oswritebytes(1, "hello") 448 oswritebytes(2, "hello") 449 print (x) 450 sys.stderr.write(str(y)) 451 return 42 452 453 capfd = py.io.StdCaptureFD(patchsys=False) 454 try: 455 res, out, err = py.io.StdCapture.call(func, 3, y=4) 456 finally: 457 capfd.reset() 458 assert res == 42 459 assert out.startswith("3") 460 assert err.startswith("4") 461 462@needsdup 463@py.test.mark.parametrize('use', [True, False]) 464def test_fdcapture_tmpfile_remains_the_same(tmpfile, use): 465 if not use: 466 tmpfile = True 467 cap = py.io.StdCaptureFD(out=False, err=tmpfile, now=False) 468 cap.startall() 469 capfile = cap.err.tmpfile 470 cap.suspend() 471 cap.resume() 472 capfile2 = cap.err.tmpfile 473 assert capfile2 == capfile 474 475@py.test.mark.parametrize('method', ['StdCapture', 'StdCaptureFD']) 476def test_capturing_and_logging_fundamentals(testdir, method): 477 if method == "StdCaptureFD" and not hasattr(os, 'dup'): 478 py.test.skip("need os.dup") 479 # here we check a fundamental feature 480 p = testdir.makepyfile(""" 481 import sys, os 482 import py, logging 483 cap = py.io.%s(out=False, in_=False) 484 485 logging.warn("hello1") 486 outerr = cap.suspend() 487 print ("suspend, captured %%s" %%(outerr,)) 488 logging.warn("hello2") 489 490 cap.resume() 491 logging.warn("hello3") 492 493 outerr = cap.suspend() 494 print ("suspend2, captured %%s" %% (outerr,)) 495 """ % (method,)) 496 result = testdir.runpython(p) 497 result.stdout.fnmatch_lines([ 498 "suspend, captured*hello1*", 499 "suspend2, captured*hello2*WARNING:root:hello3*", 500 ]) 501 assert "atexit" not in result.stderr.str() 502