1# Copyright (c) 2018 The Pooch Developers.
2# Distributed under the terms of the BSD 3-Clause License.
3# SPDX-License-Identifier: BSD-3-Clause
4#
5# This code is part of the Fatiando a Terra project (https://www.fatiando.org)
6#
7"""
8Test the processor hooks
9"""
10from pathlib import Path
11from tempfile import TemporaryDirectory
12import warnings
13
14import pytest
15
16from .. import Pooch
17from ..processors import Unzip, Untar, ExtractorProcessor, Decompress
18
19from .utils import pooch_test_url, pooch_test_registry, check_tiny_data, capture_log
20
21
22REGISTRY = pooch_test_registry()
23BASEURL = pooch_test_url()
24
25
26@pytest.mark.network
27@pytest.mark.parametrize(
28    "method,ext,name",
29    [
30        ("auto", "xz", None),
31        ("lzma", "xz", None),
32        ("xz", "xz", None),
33        ("bzip2", "bz2", None),
34        ("gzip", "gz", None),
35        ("gzip", "gz", "different-name.txt"),
36    ],
37    ids=["auto", "lzma", "xz", "bz2", "gz", "name"],
38)
39def test_decompress(method, ext, name):
40    "Check that decompression after download works for all formats"
41    processor = Decompress(method=method, name=name)
42    with TemporaryDirectory() as local_store:
43        path = Path(local_store)
44        if name is None:
45            true_path = str(path / ".".join(["tiny-data.txt", ext, "decomp"]))
46        else:
47            true_path = str(path / name)
48        # Setup a pooch in a temp dir
49        pup = Pooch(path=path, base_url=BASEURL, registry=REGISTRY)
50        # Check the logs when downloading and from the processor
51        with capture_log() as log_file:
52            fname = pup.fetch("tiny-data.txt." + ext, processor=processor)
53            logs = log_file.getvalue()
54            lines = logs.splitlines()
55            assert len(lines) == 2
56            assert lines[0].split()[0] == "Downloading"
57            assert lines[-1].startswith("Decompressing")
58            assert method in lines[-1]
59        assert fname == true_path
60        check_tiny_data(fname)
61        # Check that processor doesn't execute when not downloading
62        with capture_log() as log_file:
63            fname = pup.fetch("tiny-data.txt." + ext, processor=processor)
64            assert log_file.getvalue() == ""
65        assert fname == true_path
66        check_tiny_data(fname)
67
68
69@pytest.mark.network
70def test_decompress_fails():
71    "Should fail if method='auto' and no extension is given in the file name"
72    with TemporaryDirectory() as local_store:
73        path = Path(local_store)
74        pup = Pooch(path=path, base_url=BASEURL, registry=REGISTRY)
75        # Invalid extension
76        with pytest.raises(ValueError) as exception:
77            with warnings.catch_warnings():
78                pup.fetch("tiny-data.txt", processor=Decompress(method="auto"))
79        assert exception.value.args[0].startswith("Unrecognized file extension '.txt'")
80        assert "pooch.Unzip/Untar" not in exception.value.args[0]
81        # Should also fail for a bad method name
82        with pytest.raises(ValueError) as exception:
83            with warnings.catch_warnings():
84                pup.fetch("tiny-data.txt", processor=Decompress(method="bla"))
85        assert exception.value.args[0].startswith("Invalid compression method 'bla'")
86        assert "pooch.Unzip/Untar" not in exception.value.args[0]
87        # Point people to Untar and Unzip
88        with pytest.raises(ValueError) as exception:
89            with warnings.catch_warnings():
90                pup.fetch("tiny-data.txt", processor=Decompress(method="zip"))
91        assert exception.value.args[0].startswith("Invalid compression method 'zip'")
92        assert "pooch.Unzip/Untar" in exception.value.args[0]
93        with pytest.raises(ValueError) as exception:
94            with warnings.catch_warnings():
95                pup.fetch("store.zip", processor=Decompress(method="auto"))
96        assert exception.value.args[0].startswith("Unrecognized file extension '.zip'")
97        assert "pooch.Unzip/Untar" in exception.value.args[0]
98
99
100@pytest.mark.network
101def test_extractprocessor_fails():
102    "The base class should be used and should fail when passed to fecth"
103    with TemporaryDirectory() as local_store:
104        # Setup a pooch in a temp dir
105        pup = Pooch(path=Path(local_store), base_url=BASEURL, registry=REGISTRY)
106        processor = ExtractorProcessor()
107        with pytest.raises(NotImplementedError) as exception:
108            pup.fetch("tiny-data.tar.gz", processor=processor)
109        assert "'suffix'" in exception.value.args[0]
110        processor.suffix = "tar.gz"
111        with pytest.raises(NotImplementedError) as exception:
112            pup.fetch("tiny-data.tar.gz", processor=processor)
113        assert not exception.value.args
114
115
116@pytest.mark.network
117@pytest.mark.parametrize(
118    "target_path", [None, "some_custom_path"], ids=["default_path", "custom_path"]
119)
120@pytest.mark.parametrize(
121    "archive,members",
122    [
123        ("tiny-data", ["tiny-data.txt"]),
124        ("store", None),
125        ("store", ["store/tiny-data.txt"]),
126        ("store", ["store/subdir/tiny-data.txt"]),
127        ("store", ["store/subdir"]),
128        ("store", ["store/tiny-data.txt", "store/subdir"]),
129    ],
130    ids=[
131        "single_file",
132        "archive_all",
133        "archive_file",
134        "archive_subdir_file",
135        "archive_subdir",
136        "archive_multiple",
137    ],
138)
139@pytest.mark.parametrize(
140    "processor_class,extension",
141    [(Unzip, ".zip"), (Untar, ".tar.gz")],
142    ids=["Unzip", "Untar"],
143)
144def test_unpacking(processor_class, extension, target_path, archive, members):
145    "Tests the behaviour of processors for unpacking archives (Untar, Unzip)"
146    processor = processor_class(members=members, extract_dir=target_path)
147    if target_path is None:
148        target_path = archive + extension + processor.suffix
149    with TemporaryDirectory() as path:
150        path = Path(path)
151        true_paths, expected_log = _unpacking_expected_paths_and_logs(
152            archive, members, path / target_path, processor_class.__name__
153        )
154        # Setup a pooch in a temp dir
155        pup = Pooch(path=path, base_url=BASEURL, registry=REGISTRY)
156        # Capture logs and check for the right processor message
157        with capture_log() as log_file:
158            fnames = pup.fetch(archive + extension, processor=processor)
159            assert set(fnames) == true_paths
160            _check_logs(log_file, expected_log)
161        for fname in fnames:
162            check_tiny_data(fname)
163        # Check that processor doesn't execute when not downloading
164        with capture_log() as log_file:
165            fnames = pup.fetch(archive + extension, processor=processor)
166            assert set(fnames) == true_paths
167            _check_logs(log_file, [])
168        for fname in fnames:
169            check_tiny_data(fname)
170
171
172def _check_logs(log_file, expected_lines):
173    """
174    Assert that the lines in the log match the expected ones.
175    """
176    lines = log_file.getvalue().splitlines()
177    assert len(lines) == len(expected_lines)
178    for line, expected_line in zip(lines, expected_lines):
179        assert line.startswith(expected_line)
180
181
182def _unpacking_expected_paths_and_logs(archive, members, path, name):
183    """
184    Generate the appropriate expected paths and log message depending on the
185    parameters for the test.
186    """
187    log_lines = ["Downloading"]
188    if archive == "tiny-data":
189        true_paths = {str(path / "tiny-data.txt")}
190        log_lines.append("Extracting 'tiny-data.txt'")
191    elif archive == "store" and members is None:
192        true_paths = {
193            str(path / "store" / "tiny-data.txt"),
194            str(path / "store" / "subdir" / "tiny-data.txt"),
195        }
196        log_lines.append(f"{name}{name[-1]}ing contents")
197    elif archive == "store" and members is not None:
198        true_paths = []
199        for member in members:
200            true_path = path / Path(*member.split("/"))
201            if not str(true_path).endswith("tiny-data.txt"):
202                true_path = true_path / "tiny-data.txt"
203            true_paths.append(str(true_path))
204            log_lines.append(f"Extracting '{member}'")
205        true_paths = set(true_paths)
206    return true_paths, log_lines
207