1import io
2import os
3import random
4import shutil
5import signal
6import sys
7import time
8import pytest
9from pathlib import Path
10
11from xopen import xopen, PipedCompressionWriter, PipedGzipReader, \
12    PipedGzipWriter, _MAX_PIPE_SIZE, _can_read_concatenated_gz
13
14extensions = ["", ".gz", ".bz2"]
15
16try:
17    import lzma
18    extensions.append(".xz")
19except ImportError:
20    lzma = None
21
22try:
23    import fcntl
24    if not hasattr(fcntl, "F_GETPIPE_SZ") and sys.platform == "linux":
25        setattr(fcntl, "F_GETPIPE_SZ", 1032)
26except ImportError:
27    fcntl = None
28
29base = "tests/file.txt"
30files = [base + ext for ext in extensions]
31CONTENT_LINES = ['Testing, testing ...\n', 'The second line.\n']
32CONTENT = ''.join(CONTENT_LINES)
33
34
35@pytest.fixture(params=extensions)
36def ext(request):
37    return request.param
38
39
40@pytest.fixture(params=files)
41def fname(request):
42    return request.param
43
44
45@pytest.fixture
46def lacking_pigz_permissions(tmp_path):
47    """
48    Set PATH to a directory that contains a pigz binary with permissions set to 000.
49    If no suitable pigz binary could be found, PATH is set to an empty directory
50    """
51    pigz_path = shutil.which("pigz")
52    if pigz_path:
53        shutil.copy(pigz_path, str(tmp_path))
54        os.chmod(str(tmp_path / "pigz"), 0)
55
56    path = os.environ["PATH"]
57    os.environ["PATH"] = str(tmp_path)
58    yield
59    os.environ["PATH"] = path
60
61
62@pytest.fixture
63def large_gzip(tmpdir):
64    path = str(tmpdir.join("large.gz"))
65    random_text = ''.join(random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ\n') for _ in range(1024))
66    # Make the text a lot bigger in order to ensure that it is larger than the
67    # pipe buffer size.
68    random_text *= 1024
69    with xopen(path, 'w') as f:
70        f.write(random_text)
71    return path
72
73
74@pytest.fixture
75def truncated_gzip(large_gzip):
76    with open(large_gzip, 'a') as f:
77        f.truncate(os.stat(large_gzip).st_size - 10)
78    return large_gzip
79
80
81def test_xopen_text(fname):
82    with xopen(fname, 'rt') as f:
83        lines = list(f)
84        assert len(lines) == 2
85        assert lines[1] == 'The second line.\n', fname
86
87
88def test_xopen_binary(fname):
89    with xopen(fname, 'rb') as f:
90        lines = list(f)
91        assert len(lines) == 2
92        assert lines[1] == b'The second line.\n', fname
93
94
95def test_no_context_manager_text(fname):
96    f = xopen(fname, 'rt')
97    lines = list(f)
98    assert len(lines) == 2
99    assert lines[1] == 'The second line.\n', fname
100    f.close()
101    assert f.closed
102
103
104def test_no_context_manager_binary(fname):
105    f = xopen(fname, 'rb')
106    lines = list(f)
107    assert len(lines) == 2
108    assert lines[1] == b'The second line.\n', fname
109    f.close()
110    assert f.closed
111
112
113def test_readinto(fname):
114    # Test whether .readinto() works
115    content = CONTENT.encode('utf-8')
116    with xopen(fname, 'rb') as f:
117        b = bytearray(len(content) + 100)
118        length = f.readinto(b)
119        assert length == len(content)
120        assert b[:length] == content
121
122
123def test_pipedgzipreader_readinto():
124    # Test whether PipedGzipReader.readinto works
125    content = CONTENT.encode('utf-8')
126    with PipedGzipReader("tests/file.txt.gz", "rb") as f:
127        b = bytearray(len(content) + 100)
128        length = f.readinto(b)
129        assert length == len(content)
130        assert b[:length] == content
131
132
133def test_pipedgzipreader_textiowrapper():
134    with PipedGzipReader("tests/file.txt.gz", "rb") as f:
135        wrapped = io.TextIOWrapper(f)
136        assert wrapped.read() == CONTENT
137
138
139def test_detect_gzip_file_format_from_content():
140    with xopen("tests/file.txt.gz.test", "rb") as fh:
141        assert fh.readline() == CONTENT_LINES[0].encode("utf-8")
142
143
144def test_detect_bz2_file_format_from_content():
145    with xopen("tests/file.txt.bz2.test", "rb") as fh:
146        assert fh.readline() == CONTENT_LINES[0].encode("utf-8")
147
148
149def test_readline(fname):
150    first_line = CONTENT_LINES[0].encode('utf-8')
151    with xopen(fname, 'rb') as f:
152        assert f.readline() == first_line
153
154
155def test_readline_text(fname):
156    with xopen(fname, 'r') as f:
157        assert f.readline() == CONTENT_LINES[0]
158
159
160def test_readline_pipedgzipreader():
161    first_line = CONTENT_LINES[0].encode('utf-8')
162    with PipedGzipReader("tests/file.txt.gz", "rb") as f:
163        assert f.readline() == first_line
164
165
166def test_readline_text_pipedgzipreader():
167    with PipedGzipReader("tests/file.txt.gz", "r") as f:
168        assert f.readline() == CONTENT_LINES[0]
169
170
171@pytest.mark.parametrize("threads", [None, 1, 2])
172def test_pipedgzipreader_iter(threads):
173    with PipedGzipReader("tests/file.txt.gz", mode="r", threads=threads) as f:
174        lines = list(f)
175        assert lines[0] == CONTENT_LINES[0]
176
177
178def test_next(fname):
179    with xopen(fname, "rt") as f:
180        _ = next(f)
181        line2 = next(f)
182        assert line2 == 'The second line.\n', fname
183
184
185def test_xopen_has_iter_method(ext, tmpdir):
186    path = str(tmpdir.join("out" + ext))
187    with xopen(path, mode='w') as f:
188        assert hasattr(f, '__iter__')
189
190
191def test_pipedgzipwriter_has_iter_method(tmpdir):
192    with PipedGzipWriter(str(tmpdir.join("out.gz"))) as f:
193        assert hasattr(f, '__iter__')
194
195
196def test_iter_without_with(fname):
197    f = xopen(fname, "rt")
198    it = iter(f)
199    assert CONTENT_LINES[0] == next(it)
200    f.close()
201
202
203def test_pipedgzipreader_iter_without_with():
204    it = iter(PipedGzipReader("tests/file.txt.gz"))
205    assert CONTENT_LINES[0] == next(it)
206
207
208@pytest.mark.parametrize("mode", ["rb", "rt"])
209def test_pipedgzipreader_close(large_gzip, mode):
210    with PipedGzipReader(large_gzip, mode=mode) as f:
211        f.readline()
212        time.sleep(0.2)
213    # The subprocess should be properly terminated now
214
215
216def test_partial_gzip_iteration_closes_correctly(large_gzip):
217    class LineReader:
218        def __init__(self, file):
219            self.file = xopen(file, "rb")
220
221        def __iter__(self):
222            wrapper = io.TextIOWrapper(self.file)
223            yield from wrapper
224
225    f = LineReader(large_gzip)
226    next(iter(f))
227    f.file.close()
228
229
230def test_nonexisting_file(ext):
231    with pytest.raises(IOError):
232        with xopen('this-file-does-not-exist' + ext):
233            pass  # pragma: no cover
234
235
236def test_write_to_nonexisting_dir(ext):
237    with pytest.raises(IOError):
238        with xopen('this/path/does/not/exist/file.txt' + ext, 'w'):
239            pass  # pragma: no cover
240
241
242def test_invalid_mode():
243    with pytest.raises(ValueError):
244        with xopen("tests/file.txt.gz", mode="hallo"):
245            pass  # pragma: no cover
246
247
248def test_filename_not_a_string():
249    with pytest.raises(TypeError):
250        with xopen(123, mode="r"):
251            pass  # pragma: no cover
252
253
254def test_invalid_compression_level(tmpdir):
255    path = str(tmpdir.join("out.gz"))
256    with pytest.raises(ValueError) as e:
257        with xopen(path, mode="w", compresslevel=17) as f:
258            f.write("hello")  # pragma: no cover
259    assert "between 1 and 9" in e.value.args[0]
260
261
262@pytest.mark.parametrize("ext", extensions)
263def test_append(ext, tmpdir):
264    text = b"AB"
265    reference = text + text
266    path = str(tmpdir.join("the-file" + ext))
267    with xopen(path, "ab") as f:
268        f.write(text)
269    with xopen(path, "ab") as f:
270        f.write(text)
271    with xopen(path, "r") as f:
272        for appended in f:
273            pass
274        reference = reference.decode("utf-8")
275        assert appended == reference
276
277
278@pytest.mark.parametrize("ext", extensions)
279def test_append_text(ext, tmpdir):
280    text = "AB"
281    reference = text + text
282    path = str(tmpdir.join("the-file" + ext))
283    with xopen(path, "at") as f:
284        f.write(text)
285    with xopen(path, "at") as f:
286        f.write(text)
287    with xopen(path, "rt") as f:
288        for appended in f:
289            pass
290        assert appended == reference
291
292
293class TookTooLongError(Exception):
294    pass
295
296
297class timeout:
298    # copied from https://stackoverflow.com/a/22348885/715090
299    def __init__(self, seconds=1):
300        self.seconds = seconds
301
302    def handle_timeout(self, signum, frame):
303        raise TookTooLongError()  # pragma: no cover
304
305    def __enter__(self):
306        signal.signal(signal.SIGALRM, self.handle_timeout)
307        signal.alarm(self.seconds)
308
309    def __exit__(self, type, value, traceback):
310        signal.alarm(0)
311
312
313def test_truncated_gz(truncated_gzip):
314    with timeout(seconds=2):
315        with pytest.raises((EOFError, IOError)):
316            f = xopen(truncated_gzip, "r")
317            f.read()
318            f.close()  # pragma: no cover
319
320
321def test_truncated_gz_iter(truncated_gzip):
322    with timeout(seconds=2):
323        with pytest.raises((EOFError, IOError)):
324            f = xopen(truncated_gzip, 'r')
325            for line in f:
326                pass
327            f.close()  # pragma: no cover
328
329
330def test_truncated_gz_with(truncated_gzip):
331    with timeout(seconds=2):
332        with pytest.raises((EOFError, IOError)):
333            with xopen(truncated_gzip, 'r') as f:
334                f.read()
335
336
337def test_truncated_gz_iter_with(truncated_gzip):
338    with timeout(seconds=2):
339        with pytest.raises((EOFError, IOError)):
340            with xopen(truncated_gzip, 'r') as f:
341                for line in f:
342                    pass
343
344
345def test_bare_read_from_gz():
346    with xopen('tests/hello.gz', 'rt') as f:
347        assert f.read() == 'hello'
348
349
350def test_read_piped_gzip():
351    with PipedGzipReader('tests/hello.gz', 'rt') as f:
352        assert f.read() == 'hello'
353
354
355def test_write_pigz_threads(tmpdir):
356    path = str(tmpdir.join('out.gz'))
357    with xopen(path, mode='w', threads=3) as f:
358        f.write('hello')
359    with xopen(path) as f:
360        assert f.read() == 'hello'
361
362
363def test_read_gzip_no_threads():
364    import gzip
365    with xopen("tests/hello.gz", "rb", threads=0) as f:
366        assert isinstance(f, gzip.GzipFile), f
367
368
369def test_write_gzip_no_threads(tmpdir):
370    import gzip
371    path = str(tmpdir.join("out.gz"))
372    with xopen(path, "wb", threads=0) as f:
373        assert isinstance(f, gzip.GzipFile), f
374
375
376def test_write_stdout():
377    f = xopen('-', mode='w')
378    print("Hello", file=f)
379    f.close()
380    # ensure stdout is not closed
381    print("Still there?")
382
383
384def test_write_stdout_contextmanager():
385    # Do not close stdout
386    with xopen('-', mode='w') as f:
387        print("Hello", file=f)
388    # ensure stdout is not closed
389    print("Still there?")
390
391
392def test_read_pathlib(fname):
393    path = Path(fname)
394    with xopen(path, mode='rt') as f:
395        assert f.read() == CONTENT
396
397
398def test_read_pathlib_binary(fname):
399    path = Path(fname)
400    with xopen(path, mode='rb') as f:
401        assert f.read() == bytes(CONTENT, 'ascii')
402
403
404def test_write_pathlib(ext, tmpdir):
405    path = Path(str(tmpdir)) / ('hello.txt' + ext)
406    with xopen(path, mode='wt') as f:
407        f.write('hello')
408    with xopen(path, mode='rt') as f:
409        assert f.read() == 'hello'
410
411
412def test_write_pathlib_binary(ext, tmpdir):
413    path = Path(str(tmpdir)) / ('hello.txt' + ext)
414    with xopen(path, mode='wb') as f:
415        f.write(b'hello')
416    with xopen(path, mode='rb') as f:
417        assert f.read() == b'hello'
418
419
420# lzma doesn’t work on PyPy3 at the moment
421if lzma is not None:
422    def test_detect_xz_file_format_from_content():
423        with xopen("tests/file.txt.xz.test", "rb") as fh:
424            assert fh.readline() == CONTENT_LINES[0].encode("utf-8")
425
426
427def test_concatenated_gzip_function():
428    assert _can_read_concatenated_gz("gzip") is True
429    assert _can_read_concatenated_gz("pigz") is True
430    assert _can_read_concatenated_gz("xz") is False
431
432
433@pytest.mark.skipif(
434    not hasattr(fcntl, "F_GETPIPE_SZ") or _MAX_PIPE_SIZE is None,
435    reason="Pipe size modifications not available on this platform.")
436def test_pipesize_changed(tmpdir):
437    path = Path(str(tmpdir), "hello.gz")
438    with xopen(path, "wb") as f:
439        assert isinstance(f, PipedCompressionWriter)
440        assert fcntl.fcntl(f._file.fileno(),
441                           fcntl.F_GETPIPE_SZ) == _MAX_PIPE_SIZE
442
443
444def test_xopen_falls_back_to_gzip_open(lacking_pigz_permissions):
445    with xopen("tests/file.txt.gz", "rb") as f:
446        assert f.readline() == CONTENT_LINES[0].encode("utf-8")
447
448
449def test_open_many_gzip_writers(tmp_path):
450    files = []
451    for i in range(1, 61):
452        path = tmp_path / "{:03d}.txt.gz".format(i)
453        f = xopen(path, "wb", threads=2)
454        f.write(b"hello")
455        files.append(f)
456    for f in files:
457        f.close()
458