1import logging
2import os
3from datetime import datetime
4
5import pytest
6import salt.serializers.json as jsonserializer
7import salt.serializers.msgpack as msgpackserializer
8import salt.serializers.plist as plistserializer
9import salt.serializers.python as pythonserializer
10import salt.serializers.yaml as yamlserializer
11import salt.states.file as filestate
12from tests.support.mock import MagicMock, call, patch
13
14try:
15    from dateutil.relativedelta import relativedelta
16
17    HAS_DATEUTIL = True
18except ImportError:
19    HAS_DATEUTIL = False
20
21NO_DATEUTIL_REASON = "python-dateutil is not installed"
22
23
24log = logging.getLogger(__name__)
25
26
27@pytest.fixture
28def configure_loader_modules():
29    return {
30        filestate: {
31            "__env__": "base",
32            "__salt__": {"file.manage_file": False},
33            "__serializers__": {
34                "yaml.serialize": yamlserializer.serialize,
35                "yaml.seserialize": yamlserializer.serialize,
36                "python.serialize": pythonserializer.serialize,
37                "json.serialize": jsonserializer.serialize,
38                "plist.serialize": plistserializer.serialize,
39                "msgpack.serialize": msgpackserializer.serialize,
40            },
41            "__opts__": {"test": False, "cachedir": ""},
42            "__instance_id__": "",
43            "__low__": {},
44            "__utils__": {},
45        }
46    }
47
48
49@pytest.mark.skipif(not HAS_DATEUTIL, reason=NO_DATEUTIL_REASON)
50@pytest.mark.slow_test
51def test_retention_schedule():
52    """
53    Test to execute the retention_schedule logic.
54
55    This test takes advantage of knowing which files it is generating,
56    which means it can easily generate list of which files it should keep.
57    """
58
59    def generate_fake_files(
60        format="example_name_%Y%m%dT%H%M%S.tar.bz2",
61        starting=datetime(2016, 2, 8, 9),
62        every=relativedelta(minutes=30),
63        ending=datetime(2015, 12, 25),
64        maxfiles=None,
65    ):
66        """
67        For starting, make sure that it's over a week from the beginning of the month
68        For every, pick only one of minutes, hours, days, weeks, months or years
69        For ending, the further away it is from starting, the slower the tests run
70        Full coverage requires over a year of separation, but that's painfully slow.
71        """
72
73        if every.years:
74            ts = datetime(starting.year, 1, 1)
75        elif every.months:
76            ts = datetime(starting.year, starting.month, 1)
77        elif every.days:
78            ts = datetime(starting.year, starting.month, starting.day)
79        elif every.hours:
80            ts = datetime(starting.year, starting.month, starting.day, starting.hour)
81        elif every.minutes:
82            ts = datetime(starting.year, starting.month, starting.day, starting.hour, 0)
83        else:
84            raise NotImplementedError("not sure what you're trying to do here")
85
86        fake_files = []
87        count = 0
88        while ending < ts:
89            fake_files.append(ts.strftime(format=format))
90            count += 1
91            if maxfiles and maxfiles == "all" or maxfiles and count >= maxfiles:
92                break
93            ts -= every
94        return fake_files
95
96    fake_name = "/some/dir/name"
97    fake_retain = {
98        "most_recent": 2,
99        "first_of_hour": 4,
100        "first_of_day": 7,
101        "first_of_week": 6,
102        "first_of_month": 6,
103        "first_of_year": "all",
104    }
105    fake_strptime_format = "example_name_%Y%m%dT%H%M%S.tar.bz2"
106    fake_matching_file_list = generate_fake_files()
107    # Add some files which do not match fake_strptime_format
108    fake_no_match_file_list = generate_fake_files(
109        format="no_match_%Y%m%dT%H%M%S.tar.bz2", every=relativedelta(days=1)
110    )
111
112    def lstat_side_effect(path):
113        import re
114        from time import mktime
115
116        x = re.match(r"^[^\d]*(\d{8}T\d{6})\.tar\.bz2$", path).group(1)
117        ts = mktime(datetime.strptime(x, "%Y%m%dT%H%M%S").timetuple())
118        return {
119            "st_atime": 0.0,
120            "st_ctime": 0.0,
121            "st_gid": 0,
122            "st_mode": 33188,
123            "st_mtime": ts,
124            "st_nlink": 1,
125            "st_size": 0,
126            "st_uid": 0,
127        }
128
129    mock_t = MagicMock(return_value=True)
130    mock_f = MagicMock(return_value=False)
131    mock_lstat = MagicMock(side_effect=lstat_side_effect)
132    mock_remove = MagicMock()
133
134    def run_checks(isdir=mock_t, strptime_format=None, test=False):
135        expected_ret = {
136            "name": fake_name,
137            "changes": {"retained": [], "deleted": [], "ignored": []},
138            "result": True,
139            "comment": "Name provided to file.retention must be a directory",
140        }
141        if strptime_format:
142            fake_file_list = sorted(fake_matching_file_list + fake_no_match_file_list)
143        else:
144            fake_file_list = sorted(fake_matching_file_list)
145        mock_readdir = MagicMock(return_value=fake_file_list)
146
147        with patch.dict(filestate.__opts__, {"test": test}):
148            with patch.object(os.path, "isdir", isdir):
149                mock_readdir.reset_mock()
150                with patch.dict(filestate.__salt__, {"file.readdir": mock_readdir}):
151                    with patch.dict(filestate.__salt__, {"file.lstat": mock_lstat}):
152                        mock_remove.reset_mock()
153                        with patch.dict(
154                            filestate.__salt__, {"file.remove": mock_remove}
155                        ):
156                            if strptime_format:
157                                actual_ret = filestate.retention_schedule(
158                                    fake_name,
159                                    fake_retain,
160                                    strptime_format=fake_strptime_format,
161                                )
162                            else:
163                                actual_ret = filestate.retention_schedule(
164                                    fake_name, fake_retain
165                                )
166
167        if not isdir():
168            mock_readdir.assert_has_calls([])
169            expected_ret["result"] = False
170        else:
171            mock_readdir.assert_called_once_with(fake_name)
172            ignored_files = fake_no_match_file_list if strptime_format else []
173            retained_files = set(
174                generate_fake_files(maxfiles=fake_retain["most_recent"])
175            )
176            junk_list = [
177                ("first_of_hour", relativedelta(hours=1)),
178                ("first_of_day", relativedelta(days=1)),
179                ("first_of_week", relativedelta(weeks=1)),
180                ("first_of_month", relativedelta(months=1)),
181                ("first_of_year", relativedelta(years=1)),
182            ]
183            for retainable, retain_interval in junk_list:
184                new_retains = set(
185                    generate_fake_files(
186                        maxfiles=fake_retain[retainable], every=retain_interval
187                    )
188                )
189                # if we generate less than the number of files expected,
190                # then the oldest file will also be retained
191                # (correctly, since its the first in it's category)
192                if (
193                    fake_retain[retainable] == "all"
194                    or len(new_retains) < fake_retain[retainable]
195                ):
196                    new_retains.add(fake_file_list[0])
197                retained_files |= new_retains
198
199            deleted_files = sorted(
200                list(set(fake_file_list) - retained_files - set(ignored_files)),
201                reverse=True,
202            )
203            retained_files = sorted(list(retained_files), reverse=True)
204            expected_ret["changes"] = {
205                "retained": retained_files,
206                "deleted": deleted_files,
207                "ignored": ignored_files,
208            }
209            if test:
210                expected_ret["result"] = None
211                expected_ret[
212                    "comment"
213                ] = "{} backups would have been removed from {}.\n" "".format(
214                    len(deleted_files), fake_name
215                )
216            else:
217                expected_ret[
218                    "comment"
219                ] = "{} backups were removed from {}.\n" "".format(
220                    len(deleted_files), fake_name
221                )
222                mock_remove.assert_has_calls(
223                    [call(os.path.join(fake_name, x)) for x in deleted_files],
224                    any_order=True,
225                )
226
227        assert actual_ret == expected_ret
228
229    run_checks(isdir=mock_f)
230    run_checks()
231    run_checks(test=True)
232    run_checks(strptime_format=fake_strptime_format)
233    run_checks(strptime_format=fake_strptime_format, test=True)
234