1import io 2import os 3import random 4import shutil 5import signal 6import sys 7import time 8import pytest 9from pathlib import Path 10 11from xopen import xopen, PipedCompressionWriter, PipedGzipReader, \ 12 PipedGzipWriter, _MAX_PIPE_SIZE, _can_read_concatenated_gz 13 14extensions = ["", ".gz", ".bz2"] 15 16try: 17 import lzma 18 extensions.append(".xz") 19except ImportError: 20 lzma = None 21 22try: 23 import fcntl 24 if not hasattr(fcntl, "F_GETPIPE_SZ") and sys.platform == "linux": 25 setattr(fcntl, "F_GETPIPE_SZ", 1032) 26except ImportError: 27 fcntl = None 28 29base = "tests/file.txt" 30files = [base + ext for ext in extensions] 31CONTENT_LINES = ['Testing, testing ...\n', 'The second line.\n'] 32CONTENT = ''.join(CONTENT_LINES) 33 34 35@pytest.fixture(params=extensions) 36def ext(request): 37 return request.param 38 39 40@pytest.fixture(params=files) 41def fname(request): 42 return request.param 43 44 45@pytest.fixture 46def lacking_pigz_permissions(tmp_path): 47 """ 48 Set PATH to a directory that contains a pigz binary with permissions set to 000. 49 If no suitable pigz binary could be found, PATH is set to an empty directory 50 """ 51 pigz_path = shutil.which("pigz") 52 if pigz_path: 53 shutil.copy(pigz_path, str(tmp_path)) 54 os.chmod(str(tmp_path / "pigz"), 0) 55 56 path = os.environ["PATH"] 57 os.environ["PATH"] = str(tmp_path) 58 yield 59 os.environ["PATH"] = path 60 61 62@pytest.fixture 63def large_gzip(tmpdir): 64 path = str(tmpdir.join("large.gz")) 65 random_text = ''.join(random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ\n') for _ in range(1024)) 66 # Make the text a lot bigger in order to ensure that it is larger than the 67 # pipe buffer size. 68 random_text *= 1024 69 with xopen(path, 'w') as f: 70 f.write(random_text) 71 return path 72 73 74@pytest.fixture 75def truncated_gzip(large_gzip): 76 with open(large_gzip, 'a') as f: 77 f.truncate(os.stat(large_gzip).st_size - 10) 78 return large_gzip 79 80 81def test_xopen_text(fname): 82 with xopen(fname, 'rt') as f: 83 lines = list(f) 84 assert len(lines) == 2 85 assert lines[1] == 'The second line.\n', fname 86 87 88def test_xopen_binary(fname): 89 with xopen(fname, 'rb') as f: 90 lines = list(f) 91 assert len(lines) == 2 92 assert lines[1] == b'The second line.\n', fname 93 94 95def test_no_context_manager_text(fname): 96 f = xopen(fname, 'rt') 97 lines = list(f) 98 assert len(lines) == 2 99 assert lines[1] == 'The second line.\n', fname 100 f.close() 101 assert f.closed 102 103 104def test_no_context_manager_binary(fname): 105 f = xopen(fname, 'rb') 106 lines = list(f) 107 assert len(lines) == 2 108 assert lines[1] == b'The second line.\n', fname 109 f.close() 110 assert f.closed 111 112 113def test_readinto(fname): 114 # Test whether .readinto() works 115 content = CONTENT.encode('utf-8') 116 with xopen(fname, 'rb') as f: 117 b = bytearray(len(content) + 100) 118 length = f.readinto(b) 119 assert length == len(content) 120 assert b[:length] == content 121 122 123def test_pipedgzipreader_readinto(): 124 # Test whether PipedGzipReader.readinto works 125 content = CONTENT.encode('utf-8') 126 with PipedGzipReader("tests/file.txt.gz", "rb") as f: 127 b = bytearray(len(content) + 100) 128 length = f.readinto(b) 129 assert length == len(content) 130 assert b[:length] == content 131 132 133def test_pipedgzipreader_textiowrapper(): 134 with PipedGzipReader("tests/file.txt.gz", "rb") as f: 135 wrapped = io.TextIOWrapper(f) 136 assert wrapped.read() == CONTENT 137 138 139def test_detect_gzip_file_format_from_content(): 140 with xopen("tests/file.txt.gz.test", "rb") as fh: 141 assert fh.readline() == CONTENT_LINES[0].encode("utf-8") 142 143 144def test_detect_bz2_file_format_from_content(): 145 with xopen("tests/file.txt.bz2.test", "rb") as fh: 146 assert fh.readline() == CONTENT_LINES[0].encode("utf-8") 147 148 149def test_readline(fname): 150 first_line = CONTENT_LINES[0].encode('utf-8') 151 with xopen(fname, 'rb') as f: 152 assert f.readline() == first_line 153 154 155def test_readline_text(fname): 156 with xopen(fname, 'r') as f: 157 assert f.readline() == CONTENT_LINES[0] 158 159 160def test_readline_pipedgzipreader(): 161 first_line = CONTENT_LINES[0].encode('utf-8') 162 with PipedGzipReader("tests/file.txt.gz", "rb") as f: 163 assert f.readline() == first_line 164 165 166def test_readline_text_pipedgzipreader(): 167 with PipedGzipReader("tests/file.txt.gz", "r") as f: 168 assert f.readline() == CONTENT_LINES[0] 169 170 171@pytest.mark.parametrize("threads", [None, 1, 2]) 172def test_pipedgzipreader_iter(threads): 173 with PipedGzipReader("tests/file.txt.gz", mode="r", threads=threads) as f: 174 lines = list(f) 175 assert lines[0] == CONTENT_LINES[0] 176 177 178def test_next(fname): 179 with xopen(fname, "rt") as f: 180 _ = next(f) 181 line2 = next(f) 182 assert line2 == 'The second line.\n', fname 183 184 185def test_xopen_has_iter_method(ext, tmpdir): 186 path = str(tmpdir.join("out" + ext)) 187 with xopen(path, mode='w') as f: 188 assert hasattr(f, '__iter__') 189 190 191def test_pipedgzipwriter_has_iter_method(tmpdir): 192 with PipedGzipWriter(str(tmpdir.join("out.gz"))) as f: 193 assert hasattr(f, '__iter__') 194 195 196def test_iter_without_with(fname): 197 f = xopen(fname, "rt") 198 it = iter(f) 199 assert CONTENT_LINES[0] == next(it) 200 f.close() 201 202 203def test_pipedgzipreader_iter_without_with(): 204 it = iter(PipedGzipReader("tests/file.txt.gz")) 205 assert CONTENT_LINES[0] == next(it) 206 207 208@pytest.mark.parametrize("mode", ["rb", "rt"]) 209def test_pipedgzipreader_close(large_gzip, mode): 210 with PipedGzipReader(large_gzip, mode=mode) as f: 211 f.readline() 212 time.sleep(0.2) 213 # The subprocess should be properly terminated now 214 215 216def test_partial_gzip_iteration_closes_correctly(large_gzip): 217 class LineReader: 218 def __init__(self, file): 219 self.file = xopen(file, "rb") 220 221 def __iter__(self): 222 wrapper = io.TextIOWrapper(self.file) 223 yield from wrapper 224 225 f = LineReader(large_gzip) 226 next(iter(f)) 227 f.file.close() 228 229 230def test_nonexisting_file(ext): 231 with pytest.raises(IOError): 232 with xopen('this-file-does-not-exist' + ext): 233 pass # pragma: no cover 234 235 236def test_write_to_nonexisting_dir(ext): 237 with pytest.raises(IOError): 238 with xopen('this/path/does/not/exist/file.txt' + ext, 'w'): 239 pass # pragma: no cover 240 241 242def test_invalid_mode(): 243 with pytest.raises(ValueError): 244 with xopen("tests/file.txt.gz", mode="hallo"): 245 pass # pragma: no cover 246 247 248def test_filename_not_a_string(): 249 with pytest.raises(TypeError): 250 with xopen(123, mode="r"): 251 pass # pragma: no cover 252 253 254def test_invalid_compression_level(tmpdir): 255 path = str(tmpdir.join("out.gz")) 256 with pytest.raises(ValueError) as e: 257 with xopen(path, mode="w", compresslevel=17) as f: 258 f.write("hello") # pragma: no cover 259 assert "between 1 and 9" in e.value.args[0] 260 261 262@pytest.mark.parametrize("ext", extensions) 263def test_append(ext, tmpdir): 264 text = b"AB" 265 reference = text + text 266 path = str(tmpdir.join("the-file" + ext)) 267 with xopen(path, "ab") as f: 268 f.write(text) 269 with xopen(path, "ab") as f: 270 f.write(text) 271 with xopen(path, "r") as f: 272 for appended in f: 273 pass 274 reference = reference.decode("utf-8") 275 assert appended == reference 276 277 278@pytest.mark.parametrize("ext", extensions) 279def test_append_text(ext, tmpdir): 280 text = "AB" 281 reference = text + text 282 path = str(tmpdir.join("the-file" + ext)) 283 with xopen(path, "at") as f: 284 f.write(text) 285 with xopen(path, "at") as f: 286 f.write(text) 287 with xopen(path, "rt") as f: 288 for appended in f: 289 pass 290 assert appended == reference 291 292 293class TookTooLongError(Exception): 294 pass 295 296 297class timeout: 298 # copied from https://stackoverflow.com/a/22348885/715090 299 def __init__(self, seconds=1): 300 self.seconds = seconds 301 302 def handle_timeout(self, signum, frame): 303 raise TookTooLongError() # pragma: no cover 304 305 def __enter__(self): 306 signal.signal(signal.SIGALRM, self.handle_timeout) 307 signal.alarm(self.seconds) 308 309 def __exit__(self, type, value, traceback): 310 signal.alarm(0) 311 312 313def test_truncated_gz(truncated_gzip): 314 with timeout(seconds=2): 315 with pytest.raises((EOFError, IOError)): 316 f = xopen(truncated_gzip, "r") 317 f.read() 318 f.close() # pragma: no cover 319 320 321def test_truncated_gz_iter(truncated_gzip): 322 with timeout(seconds=2): 323 with pytest.raises((EOFError, IOError)): 324 f = xopen(truncated_gzip, 'r') 325 for line in f: 326 pass 327 f.close() # pragma: no cover 328 329 330def test_truncated_gz_with(truncated_gzip): 331 with timeout(seconds=2): 332 with pytest.raises((EOFError, IOError)): 333 with xopen(truncated_gzip, 'r') as f: 334 f.read() 335 336 337def test_truncated_gz_iter_with(truncated_gzip): 338 with timeout(seconds=2): 339 with pytest.raises((EOFError, IOError)): 340 with xopen(truncated_gzip, 'r') as f: 341 for line in f: 342 pass 343 344 345def test_bare_read_from_gz(): 346 with xopen('tests/hello.gz', 'rt') as f: 347 assert f.read() == 'hello' 348 349 350def test_read_piped_gzip(): 351 with PipedGzipReader('tests/hello.gz', 'rt') as f: 352 assert f.read() == 'hello' 353 354 355def test_write_pigz_threads(tmpdir): 356 path = str(tmpdir.join('out.gz')) 357 with xopen(path, mode='w', threads=3) as f: 358 f.write('hello') 359 with xopen(path) as f: 360 assert f.read() == 'hello' 361 362 363def test_read_gzip_no_threads(): 364 import gzip 365 with xopen("tests/hello.gz", "rb", threads=0) as f: 366 assert isinstance(f, gzip.GzipFile), f 367 368 369def test_write_gzip_no_threads(tmpdir): 370 import gzip 371 path = str(tmpdir.join("out.gz")) 372 with xopen(path, "wb", threads=0) as f: 373 assert isinstance(f, gzip.GzipFile), f 374 375 376def test_write_stdout(): 377 f = xopen('-', mode='w') 378 print("Hello", file=f) 379 f.close() 380 # ensure stdout is not closed 381 print("Still there?") 382 383 384def test_write_stdout_contextmanager(): 385 # Do not close stdout 386 with xopen('-', mode='w') as f: 387 print("Hello", file=f) 388 # ensure stdout is not closed 389 print("Still there?") 390 391 392def test_read_pathlib(fname): 393 path = Path(fname) 394 with xopen(path, mode='rt') as f: 395 assert f.read() == CONTENT 396 397 398def test_read_pathlib_binary(fname): 399 path = Path(fname) 400 with xopen(path, mode='rb') as f: 401 assert f.read() == bytes(CONTENT, 'ascii') 402 403 404def test_write_pathlib(ext, tmpdir): 405 path = Path(str(tmpdir)) / ('hello.txt' + ext) 406 with xopen(path, mode='wt') as f: 407 f.write('hello') 408 with xopen(path, mode='rt') as f: 409 assert f.read() == 'hello' 410 411 412def test_write_pathlib_binary(ext, tmpdir): 413 path = Path(str(tmpdir)) / ('hello.txt' + ext) 414 with xopen(path, mode='wb') as f: 415 f.write(b'hello') 416 with xopen(path, mode='rb') as f: 417 assert f.read() == b'hello' 418 419 420# lzma doesn’t work on PyPy3 at the moment 421if lzma is not None: 422 def test_detect_xz_file_format_from_content(): 423 with xopen("tests/file.txt.xz.test", "rb") as fh: 424 assert fh.readline() == CONTENT_LINES[0].encode("utf-8") 425 426 427def test_concatenated_gzip_function(): 428 assert _can_read_concatenated_gz("gzip") is True 429 assert _can_read_concatenated_gz("pigz") is True 430 assert _can_read_concatenated_gz("xz") is False 431 432 433@pytest.mark.skipif( 434 not hasattr(fcntl, "F_GETPIPE_SZ") or _MAX_PIPE_SIZE is None, 435 reason="Pipe size modifications not available on this platform.") 436def test_pipesize_changed(tmpdir): 437 path = Path(str(tmpdir), "hello.gz") 438 with xopen(path, "wb") as f: 439 assert isinstance(f, PipedCompressionWriter) 440 assert fcntl.fcntl(f._file.fileno(), 441 fcntl.F_GETPIPE_SZ) == _MAX_PIPE_SIZE 442 443 444def test_xopen_falls_back_to_gzip_open(lacking_pigz_permissions): 445 with xopen("tests/file.txt.gz", "rb") as f: 446 assert f.readline() == CONTENT_LINES[0].encode("utf-8") 447 448 449def test_open_many_gzip_writers(tmp_path): 450 files = [] 451 for i in range(1, 61): 452 path = tmp_path / "{:03d}.txt.gz".format(i) 453 f = xopen(path, "wb", threads=2) 454 f.write(b"hello") 455 files.append(f) 456 for f in files: 457 f.close() 458