1"""
2    tests.unit.utils.cache_test
3    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
5    Test the salt cache objects
6"""
7
8import time
9
10import pytest
11import salt.config
12import salt.loader
13import salt.payload
14import salt.utils.cache as cache
15import salt.utils.data
16import salt.utils.files
17
18
19def test_sanity():
20    """
21    Make sure you can instantiate etc.
22    """
23    cd = cache.CacheDict(5)
24    assert isinstance(cd, cache.CacheDict)
25
26    # do some tests to make sure it looks like a dict
27    assert "foo" not in cd
28    cd["foo"] = "bar"
29    assert cd["foo"] == "bar"
30    del cd["foo"]
31    assert "foo" not in cd
32
33
34def test_ttl():
35    cd = cache.CacheDict(0.1)
36    cd["foo"] = "bar"
37    assert "foo" in cd
38    assert cd["foo"] == "bar"
39    time.sleep(0.2)
40    assert "foo" not in cd
41
42    # make sure that a get would get a regular old key error
43    with pytest.raises(KeyError):
44        cd["foo"]  # pylint: disable=pointless-statement
45
46
47@pytest.fixture
48def cache_dir(tmp_path):
49    cachedir = tmp_path / "cachedir"
50    cachedir.mkdir()
51    return cachedir
52
53
54@pytest.fixture
55def minion_config(cache_dir):
56    opts = salt.config.DEFAULT_MINION_OPTS.copy()
57    opts["cachedir"] = str(cache_dir)
58    return opts
59
60
61def test_smoke_context(minion_config):
62    """
63    Smoke test the context cache
64    """
65    context_cache = cache.ContextCache(minion_config, "cache_test")
66
67    data = {"a": "b"}
68    context_cache.cache_context(data.copy())
69
70    ret = context_cache.get_cache_context()
71
72    assert ret == data
73
74
75@pytest.fixture
76def cache_mod_name():
77    return "cache_mod"
78
79
80@pytest.fixture
81def cache_mods_path(tmp_path, cache_mod_name):
82    _cache_mods_path = tmp_path / "cache_mods"
83    mod_contents = """
84    import salt.utils.cache
85
86    def __virtual__():
87        return True
88
89    @salt.utils.cache.context_cache
90    def test_context_module():
91        if "called" in __context__:
92            __context__["called"] += 1
93        else:
94            __context__["called"] = 0
95        return __context__.value()
96
97    @salt.utils.cache.context_cache
98    def test_compare_context():
99        return __context__.value()
100    """
101    with pytest.helpers.temp_file(
102        cache_mod_name + ".py", mod_contents, _cache_mods_path
103    ):
104        yield _cache_mods_path
105
106
107def test_context_wrapper(minion_config, cache_mods_path):
108    """
109    Test to ensure that a module which decorates itself
110    with a context cache can store and retrieve its contextual
111    data
112    """
113
114    loader = salt.loader.LazyLoader(
115        [str(cache_mods_path)],
116        tag="rawmodule",
117        virtual_enable=False,
118        opts=minion_config,
119    )
120
121    cache_test_func = loader["cache_mod.test_context_module"]
122
123    assert cache_test_func()["called"] == 0
124    assert cache_test_func()["called"] == 1
125
126
127def test_set_cache(minion_config, cache_mods_path, cache_mod_name, cache_dir):
128    """
129    Tests to ensure the cache is written correctly
130    """
131
132    context = {"c": "d"}
133    loader = salt.loader.LazyLoader(
134        [str(cache_mods_path)],
135        tag="rawmodule",
136        virtual_enable=False,
137        opts=minion_config,
138        pack={"__context__": context, "__opts__": minion_config},
139    )
140
141    cache_test_func = loader["cache_mod.test_context_module"]
142
143    # Call the function to trigger the context cache
144    assert cache_test_func()["called"] == 0
145    assert cache_test_func()["called"] == 1
146    assert cache_test_func()["called"] == 2
147
148    cache_file_name = "salt.loaded.ext.rawmodule.{}.p".format(cache_mod_name)
149
150    cached_file = cache_dir / "context" / cache_file_name
151    assert cached_file.exists()
152
153    # Test manual de-serialize
154    target_cache_data = salt.utils.data.decode(
155        salt.payload.loads(cached_file.read_bytes())
156    )
157    assert target_cache_data == dict(context, called=1)
158
159    # Test cache de-serialize
160    cc = cache.ContextCache(
161        minion_config, "salt.loaded.ext.rawmodule.{}".format(cache_mod_name)
162    )
163    retrieved_cache = cc.get_cache_context()
164    assert retrieved_cache == dict(context, called=1)
165
166
167def test_refill_cache(minion_config, cache_mods_path):
168    """
169    Tests to ensure that the context cache can rehydrate a wrapped function
170    """
171    context = {"c": "d"}
172    loader = salt.loader.LazyLoader(
173        [str(cache_mods_path)],
174        tag="rawmodule",
175        virtual_enable=False,
176        opts=minion_config,
177        pack={"__context__": context, "__opts__": minion_config},
178    )
179
180    cache_test_func = loader["cache_mod.test_compare_context"]
181    # First populate the cache
182    ret = cache_test_func()
183    assert ret == context
184
185    # Then try to rehydrate a func
186    context_copy = context.copy()
187    context.clear()
188
189    # Compare to the context before it was emptied
190    ret = cache_test_func()
191    assert ret == context_copy
192
193
194def test_everything(tmp_path):
195    """
196    Make sure you can instantiate, add, update, remove, expire
197    """
198    path = str(tmp_path / "cachedir")
199
200    # test instantiation
201    cd = cache.CacheDisk(0.3, path)
202    assert isinstance(cd, cache.CacheDisk)
203
204    # test to make sure it looks like a dict
205    assert "foo" not in cd
206    cd["foo"] = "bar"
207    assert "foo" in cd
208    assert cd["foo"] == "bar"
209    del cd["foo"]
210    assert "foo" not in cd
211
212    # test persistence
213    cd["foo"] = "bar"
214    cd2 = cache.CacheDisk(0.3, path)
215    assert "foo" in cd2
216    assert cd2["foo"] == "bar"
217
218    # test ttl
219    time.sleep(0.5)
220    assert "foo" not in cd
221    assert "foo" not in cd2
222