1import datetime
2import hashlib
3import logging
4import os
5import pprint
6import textwrap
7
8import pytest
9import salt.utils.files
10from tests.support.case import ModuleCase
11from tests.support.helpers import with_tempfile
12from tests.support.mixins import SaltReturnAssertsMixin
13from tests.support.runtests import RUNTIME_VARS
14from tests.support.unit import skipIf
15
16try:
17    import M2Crypto  # pylint: disable=W0611
18
19    HAS_M2CRYPTO = True
20except ImportError:
21    HAS_M2CRYPTO = False
22
23log = logging.getLogger(__name__)
24
25
26@pytest.mark.usefixtures("salt_sub_minion")
27@skipIf(not HAS_M2CRYPTO, "Skip when no M2Crypto found")
28class x509Test(ModuleCase, SaltReturnAssertsMixin):
29    @classmethod
30    def setUpClass(cls):
31        cert_path = os.path.join(RUNTIME_VARS.BASE_FILES, "x509_test.crt")
32        with salt.utils.files.fopen(cert_path) as fp:
33            cls.x509_cert_text = fp.read()
34
35    def setUp(self):
36        with salt.utils.files.fopen(
37            os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"), "w"
38        ) as fp:
39            fp.write(
40                textwrap.dedent(
41                    """\
42                x509_signing_policies:
43                  ca_policy:
44                    - minions: '*'
45                    - signing_private_key: {0}/pki/ca.key
46                    - signing_cert: {0}/pki/ca.crt
47                    - O: Test Company
48                    - basicConstraints: "CA:false"
49                    - keyUsage: "critical digitalSignature, keyEncipherment"
50                    - extendedKeyUsage: "critical serverAuth, clientAuth"
51                    - subjectKeyIdentifier: hash
52                    - authorityKeyIdentifier: keyid
53                    - days_valid: 730
54                    - copypath: {0}/pki
55                  compound_match:
56                    - minions: 'G@x509_test_grain:correct_value'
57                    - signing_private_key: {0}/pki/ca.key
58                    - signing_cert: {0}/pki/ca.crt
59                    - O: Test Company
60                    - basicConstraints: "CA:false"
61                    - keyUsage: "critical digitalSignature, keyEncipherment"
62                    - extendedKeyUsage: "critical serverAuth, clientAuth"
63                    - subjectKeyIdentifier: hash
64                    - authorityKeyIdentifier: keyid
65                    - days_valid: 730
66                    - copypath: {0}/pki
67                     """.format(
68                        RUNTIME_VARS.TMP
69                    )
70                )
71            )
72        with salt.utils.files.fopen(
73            os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"), "w"
74        ) as fp:
75            fp.write(
76                textwrap.dedent(
77                    """\
78                     base:
79                       '*':
80                         - signing_policies
81                     """
82                )
83            )
84        self.run_function("saltutil.refresh_pillar")
85        self.run_function(
86            "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="sub_minion"
87        )
88        self.run_function(
89            "grains.set", ["x509_test_grain", "not_correct_value"], minion_tgt="minion"
90        )
91
92    def tearDown(self):
93        os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"))
94        os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"))
95        certs_path = os.path.join(RUNTIME_VARS.TMP, "pki")
96        if os.path.exists(certs_path):
97            salt.utils.files.rm_rf(certs_path)
98        self.run_function("saltutil.refresh_pillar")
99        self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="sub_minion")
100        self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="minion")
101
102    def run_function(self, *args, **kwargs):  # pylint: disable=arguments-differ
103        ret = super().run_function(*args, **kwargs)
104        return ret
105
106    @staticmethod
107    def file_checksum(path):
108        hash = hashlib.sha1()
109        with salt.utils.files.fopen(path, "rb") as f:
110            for block in iter(lambda: f.read(4096), b""):
111                hash.update(block)
112        return hash.hexdigest()
113
114    @with_tempfile(suffix=".pem", create=False)
115    @pytest.mark.slow_test
116    def test_issue_49027(self, pemfile):
117        ret = self.run_state("x509.pem_managed", name=pemfile, text=self.x509_cert_text)
118        assert isinstance(ret, dict), ret
119        ret = ret[next(iter(ret))]
120        assert ret.get("result") is True, ret
121        with salt.utils.files.fopen(pemfile) as fp:
122            result = fp.readlines()
123        self.assertEqual(self.x509_cert_text.splitlines(True), result)
124
125    @with_tempfile(suffix=".crt", create=False)
126    @with_tempfile(suffix=".key", create=False)
127    @pytest.mark.slow_test
128    def test_issue_49008(self, keyfile, crtfile):
129        ret = self.run_function(
130            "state.apply",
131            ["issue-49008"],
132            pillar={"keyfile": keyfile, "crtfile": crtfile},
133        )
134        assert isinstance(ret, dict), ret
135        for state_result in ret.values():
136            assert state_result["result"] is True, state_result
137        assert os.path.exists(keyfile)
138        assert os.path.exists(crtfile)
139
140    @pytest.mark.slow_test
141    def test_cert_signing(self):
142        ret = self.run_function(
143            "state.apply", ["x509.cert_signing"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
144        )
145        key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
146            RUNTIME_VARS.TMP
147        )
148        assert key in ret
149        assert "changes" in ret[key]
150        assert "Certificate" in ret[key]["changes"]
151        assert "New" in ret[key]["changes"]["Certificate"]
152
153    @pytest.mark.slow_test
154    def test_cert_signing_based_on_csr(self):
155        ret = self.run_function(
156            "state.apply",
157            ["x509.cert_signing_based_on_csr"],
158            pillar={"tmp_dir": RUNTIME_VARS.TMP},
159        )
160        key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
161            RUNTIME_VARS.TMP
162        )
163        assert key in ret
164        assert "changes" in ret[key]
165        assert "Certificate" in ret[key]["changes"]
166        assert "New" in ret[key]["changes"]["Certificate"]
167
168    @pytest.mark.slow_test
169    def test_proper_cert_comparison(self):
170        # In this SLS we define two certs which have identical content.
171        # The first one is expected to be created.
172        # The second one is expected to be recognized as already present.
173        ret = self.run_function(
174            "state.apply",
175            ["x509.proper_cert_comparison"],
176            pillar={"tmp_dir": RUNTIME_VARS.TMP},
177        )
178        # check the first generated cert
179        first_key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
180            RUNTIME_VARS.TMP
181        )
182        assert first_key in ret
183        assert "changes" in ret[first_key]
184        assert "Certificate" in ret[first_key]["changes"]
185        assert "New" in ret[first_key]["changes"]["Certificate"]
186        # check whether the second defined cert is considered to match the first one
187        second_key = (
188            "x509_|-second_test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
189                RUNTIME_VARS.TMP
190            )
191        )
192        assert second_key in ret
193        assert "changes" in ret[second_key]
194        assert ret[second_key]["changes"] == {}
195
196    @pytest.mark.slow_test
197    def test_crl_managed(self):
198        ret = self.run_function(
199            "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
200        )
201        key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
202            RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
203        )
204
205        # hints for easier debugging
206        # import json
207        # print(json.dumps(ret[key], indent=4, sort_keys=True))
208        # print(ret[key]['comment'])
209
210        assert key in ret
211        assert "changes" in ret[key]
212        self.assertEqual(ret[key]["result"], True)
213        assert "New" in ret[key]["changes"]
214        assert "Revoked Certificates" in ret[key]["changes"]["New"]
215        self.assertEqual(
216            ret[key]["changes"]["Old"],
217            "{}/pki/ca.crl does not exist.".format(RUNTIME_VARS.TMP),
218        )
219
220    @pytest.mark.slow_test
221    def test_crl_managed_replacing_existing_crl(self):
222        os.mkdir(os.path.join(RUNTIME_VARS.TMP, "pki"))
223        with salt.utils.files.fopen(
224            os.path.join(RUNTIME_VARS.TMP, "pki/ca.crl"), "wb"
225        ) as crl_file:
226            crl_file.write(
227                b"""-----BEGIN RSA PRIVATE KEY-----
228MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls
229pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1
2302rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB
231AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr
232yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH
233hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R
2343MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7
235u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy
236kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj
23735WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk
238TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK
239tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj
240c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ==
241-----END RSA PRIVATE KEY-----
242"""
243            )
244
245        ret = self.run_function(
246            "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
247        )
248        key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format(
249            RUNTIME_VARS.TMP, RUNTIME_VARS.TMP
250        )
251
252        # hints for easier debugging
253        # import json
254        # print(json.dumps(ret[key], indent=4, sort_keys=True))
255        # print(ret[key]['comment'])
256
257        assert key in ret
258        assert "changes" in ret[key]
259        self.assertEqual(ret[key]["result"], True)
260        assert "New" in ret[key]["changes"]
261        assert "Revoked Certificates" in ret[key]["changes"]["New"]
262        self.assertEqual(
263            ret[key]["changes"]["Old"],
264            "{}/pki/ca.crl is not a valid CRL.".format(RUNTIME_VARS.TMP),
265        )
266
267    def test_cert_issue_not_before_not_after(self):
268        ret = self.run_function(
269            "state.apply",
270            ["test_cert_not_before_not_after"],
271            pillar={"tmp_dir": RUNTIME_VARS.TMP},
272        )
273        key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
274            RUNTIME_VARS.TMP
275        )
276        assert key in ret
277        assert "changes" in ret[key]
278        assert "Certificate" in ret[key]["changes"]
279        assert "New" in ret[key]["changes"]["Certificate"]
280        assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
281        assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
282        not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
283        not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
284        assert not_before == "2019-05-05 00:00:00"
285        assert not_after == "2020-05-05 14:30:00"
286
287    def test_cert_issue_not_before(self):
288        ret = self.run_function(
289            "state.apply",
290            ["test_cert_not_before"],
291            pillar={"tmp_dir": RUNTIME_VARS.TMP},
292        )
293        key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
294            RUNTIME_VARS.TMP
295        )
296        assert key in ret
297        assert "changes" in ret[key]
298        assert "Certificate" in ret[key]["changes"]
299        assert "New" in ret[key]["changes"]["Certificate"]
300        assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
301        assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
302        not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"]
303        assert not_before == "2019-05-05 00:00:00"
304
305    def test_cert_issue_not_after(self):
306        ret = self.run_function(
307            "state.apply", ["test_cert_not_after"], pillar={"tmp_dir": RUNTIME_VARS.TMP}
308        )
309        key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format(
310            RUNTIME_VARS.TMP
311        )
312        assert key in ret
313        assert "changes" in ret[key]
314        assert "Certificate" in ret[key]["changes"]
315        assert "New" in ret[key]["changes"]["Certificate"]
316        assert "Not Before" in ret[key]["changes"]["Certificate"]["New"]
317        assert "Not After" in ret[key]["changes"]["Certificate"]["New"]
318        not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"]
319        assert not_after == "2020-05-05 14:30:00"
320
321    @with_tempfile(suffix=".crt", create=False)
322    @with_tempfile(suffix=".key", create=False)
323    def test_issue_41858(self, keyfile, crtfile):
324        ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
325        signing_policy = "no_such_policy"
326        ret = self.run_function(
327            "state.apply",
328            ["issue-41858.gen_cert"],
329            pillar={
330                "keyfile": keyfile,
331                "crtfile": crtfile,
332                "tmp_dir": RUNTIME_VARS.TMP,
333            },
334        )
335        self.assertTrue(ret[ret_key]["result"])
336        cert_sum = self.file_checksum(crtfile)
337
338        ret = self.run_function(
339            "state.apply",
340            ["issue-41858.check"],
341            pillar={
342                "keyfile": keyfile,
343                "crtfile": crtfile,
344                "signing_policy": signing_policy,
345            },
346        )
347        self.assertFalse(ret[ret_key]["result"])
348        # self.assertSaltCommentRegexpMatches(ret[ret_key], "Signing policy {0} does not exist".format(signing_policy))
349        self.assertEqual(self.file_checksum(crtfile), cert_sum)
350
351    @with_tempfile(suffix=".crt", create=False)
352    @with_tempfile(suffix=".key", create=False)
353    def test_compound_match_minion_have_correct_grain_value(self, keyfile, crtfile):
354        ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
355        signing_policy = "compound_match"
356        ret = self.run_function(
357            "state.apply",
358            ["x509_compound_match.gen_ca"],
359            pillar={"tmp_dir": RUNTIME_VARS.TMP},
360        )
361
362        # sub_minion have grain set and CA is on other minion
363        # CA minion have same grain with incorrect value
364        ret = self.run_function(
365            "state.apply",
366            ["x509_compound_match.check"],
367            minion_tgt="sub_minion",
368            pillar={
369                "keyfile": keyfile,
370                "crtfile": crtfile,
371                "signing_policy": signing_policy,
372            },
373        )
374        self.assertTrue(ret[ret_key]["result"])
375
376    @with_tempfile(suffix=".crt", create=False)
377    @with_tempfile(suffix=".key", create=False)
378    def test_compound_match_ca_have_correct_grain_value(self, keyfile, crtfile):
379        self.run_function(
380            "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="minion"
381        )
382        self.run_function(
383            "grains.set",
384            ["x509_test_grain", "not_correct_value"],
385            minion_tgt="sub_minion",
386        )
387
388        ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile)
389        signing_policy = "compound_match"
390        self.run_function(
391            "state.apply",
392            ["x509_compound_match.gen_ca"],
393            pillar={"tmp_dir": RUNTIME_VARS.TMP},
394        )
395
396        ret = self.run_function(
397            "state.apply",
398            ["x509_compound_match.check"],
399            minion_tgt="sub_minion",
400            pillar={
401                "keyfile": keyfile,
402                "crtfile": crtfile,
403                "signing_policy": signing_policy,
404            },
405        )
406        self.assertFalse(ret[ret_key]["result"])
407
408    @with_tempfile(suffix=".crt", create=False)
409    @with_tempfile(suffix=".key", create=False)
410    def test_self_signed_cert(self, keyfile, crtfile):
411        """
412        Self-signed certificate, no CA.
413        Run the state twice to confirm the cert is only created once
414        and its contents don't change.
415        """
416        first_run = self.run_function(
417            "state.apply",
418            ["x509.self_signed"],
419            pillar={"keyfile": keyfile, "crtfile": crtfile},
420        )
421        key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
422        self.assertIn("New", first_run[key]["changes"]["Certificate"])
423        self.assertEqual(
424            "Certificate is valid and up to date",
425            first_run[key]["changes"]["Status"]["New"],
426        )
427        self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
428
429        with salt.utils.files.fopen(crtfile, "r") as first_cert:
430            cert_contents = first_cert.read()
431
432        second_run = self.run_function(
433            "state.apply",
434            ["x509.self_signed"],
435            pillar={"keyfile": keyfile, "crtfile": crtfile},
436        )
437        self.assertEqual({}, second_run[key]["changes"])
438        with salt.utils.files.fopen(crtfile, "r") as second_cert:
439            self.assertEqual(
440                cert_contents,
441                second_cert.read(),
442                "Certificate contents should not have changed.",
443            )
444
445    @with_tempfile(suffix=".crt", create=False)
446    @with_tempfile(suffix=".key", create=False)
447    def test_old_self_signed_cert_is_recreated(self, keyfile, crtfile):
448        """
449        Self-signed certificate, no CA.
450        First create a cert that expires in 30 days, then recreate
451        the cert because the second state run requires days_remaining
452        to be at least 90.
453        """
454        first_run = self.run_function(
455            "state.apply",
456            ["x509.self_signed_expiry"],
457            pillar={
458                "keyfile": keyfile,
459                "crtfile": crtfile,
460                "days_valid": 30,
461                "days_remaining": 10,
462            },
463        )
464        key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
465        self.assertEqual(
466            "Certificate is valid and up to date",
467            first_run[key]["changes"]["Status"]["New"],
468        )
469        expiry = datetime.datetime.strptime(
470            first_run[key]["changes"]["Certificate"]["New"]["Not After"],
471            "%Y-%m-%d %H:%M:%S",
472        )
473        self.assertEqual(29, (expiry - datetime.datetime.now()).days)
474        self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
475
476        with salt.utils.files.fopen(crtfile, "r") as first_cert:
477            cert_contents = first_cert.read()
478
479        second_run = self.run_function(
480            "state.apply",
481            ["x509.self_signed_expiry"],
482            pillar={
483                "keyfile": keyfile,
484                "crtfile": crtfile,
485                "days_valid": 180,
486                "days_remaining": 90,
487            },
488        )
489        self.assertEqual(
490            "Certificate needs renewal: 29 days remaining but it needs to be at"
491            " least 90",
492            second_run[key]["changes"]["Status"]["Old"],
493        )
494        expiry = datetime.datetime.strptime(
495            second_run[key]["changes"]["Certificate"]["New"]["Not After"],
496            "%Y-%m-%d %H:%M:%S",
497        )
498        self.assertEqual(179, (expiry - datetime.datetime.now()).days)
499        with salt.utils.files.fopen(crtfile, "r") as second_cert:
500            self.assertNotEqual(
501                cert_contents,
502                second_cert.read(),
503                "Certificate contents should have changed.",
504            )
505
506    @with_tempfile(suffix=".crt", create=False)
507    @with_tempfile(suffix=".key", create=False)
508    def test_mismatched_self_signed_cert_is_recreated(self, keyfile, crtfile):
509        """
510        Self-signed certificate, no CA.
511        First create a cert, then run the state again with a different
512        subjectAltName. The cert should be recreated.
513        Finally, run once more with the same subjectAltName as the
514        second run. Nothing should change.
515        """
516        first_run = self.run_function(
517            "state.apply",
518            ["x509.self_signed_different_properties"],
519            pillar={
520                "keyfile": keyfile,
521                "crtfile": crtfile,
522                "subjectAltName": "DNS:alt.service.local",
523            },
524        )
525        key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
526        self.assertEqual(
527            "Certificate is valid and up to date",
528            first_run[key]["changes"]["Status"]["New"],
529        )
530        sans = first_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
531            "subjectAltName"
532        ]
533        self.assertEqual("DNS:alt.service.local", sans)
534        self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
535
536        with salt.utils.files.fopen(crtfile, "r") as first_cert:
537            first_cert_contents = first_cert.read()
538
539        second_run_pillar = {
540            "keyfile": keyfile,
541            "crtfile": crtfile,
542            "subjectAltName": "DNS:alt1.service.local, DNS:alt2.service.local",
543        }
544        second_run = self.run_function(
545            "state.apply",
546            ["x509.self_signed_different_properties"],
547            pillar=second_run_pillar,
548        )
549        self.assertEqual(
550            "Certificate properties are different: X509v3 Extensions",
551            second_run[key]["changes"]["Status"]["Old"],
552        )
553        sans = second_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][
554            "subjectAltName"
555        ]
556        self.assertEqual("DNS:alt1.service.local, DNS:alt2.service.local", sans)
557        with salt.utils.files.fopen(crtfile, "r") as second_cert:
558            second_cert_contents = second_cert.read()
559            self.assertNotEqual(
560                first_cert_contents,
561                second_cert_contents,
562                "Certificate contents should have changed.",
563            )
564
565        third_run = self.run_function(
566            "state.apply",
567            ["x509.self_signed_different_properties"],
568            pillar=second_run_pillar,
569        )
570        self.assertEqual({}, third_run[key]["changes"])
571        with salt.utils.files.fopen(crtfile, "r") as third_cert:
572            self.assertEqual(
573                second_cert_contents,
574                third_cert.read(),
575                "Certificate contents should not have changed.",
576            )
577
578    @with_tempfile(suffix=".crt", create=False)
579    @with_tempfile(suffix=".key", create=False)
580    def test_certificate_managed_with_managed_private_key_does_not_error(
581        self, keyfile, crtfile
582    ):
583        """
584        Test using the deprecated managed_private_key arg in certificate_managed does not throw an error.
585
586        TODO: Remove this test in Aluminium when the arg is removed.
587        """
588        self.run_state("x509.private_key_managed", name=keyfile, bits=4096)
589        ret = self.run_state(
590            "x509.certificate_managed",
591            name=crtfile,
592            CN="localhost",
593            signing_private_key=keyfile,
594            managed_private_key={"name": keyfile, "bits": 4096},
595        )
596        key = "x509_|-{0}_|-{0}_|-certificate_managed".format(crtfile)
597        self.assertEqual(True, ret[key]["result"])
598
599    @with_tempfile(suffix=".crt", create=False)
600    @with_tempfile(suffix=".key", create=False)
601    def test_file_properties_are_updated(self, keyfile, crtfile):
602        """
603        Self-signed certificate, no CA.
604        First create a cert, then run the state again with different
605        file mode. The cert should not be recreated, but the file
606        should be updated.
607        Finally, run once more with the same file mode as the second
608        run. Nothing should change.
609        """
610        first_run = self.run_function(
611            "state.apply",
612            ["x509.self_signed_different_properties"],
613            pillar={"keyfile": keyfile, "crtfile": crtfile, "fileMode": "0755"},
614        )
615        key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile)
616        self.assertEqual(
617            "Certificate is valid and up to date",
618            first_run[key]["changes"]["Status"]["New"],
619        )
620        self.assertTrue(os.path.exists(crtfile), "Certificate was not created.")
621        self.assertEqual("0755", oct(os.stat(crtfile).st_mode)[-4:])
622
623        second_run_pillar = {
624            "keyfile": keyfile,
625            "crtfile": crtfile,
626            "mode": "0600",
627        }
628        second_run = self.run_function(
629            "state.apply",
630            ["x509.self_signed_different_properties"],
631            pillar=second_run_pillar,
632        )
633        self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:])
634
635        third_run = self.run_function(
636            "state.apply",
637            ["x509.self_signed_different_properties"],
638            pillar=second_run_pillar,
639        )
640        self.assertEqual({}, third_run[key]["changes"])
641        self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:])
642
643    @with_tempfile(suffix=".crt", create=False)
644    @with_tempfile(suffix=".key", create=False)
645    def test_file_managed_failure(self, keyfile, crtfile):
646        """
647        Test that a failure in the file.managed call marks the state
648        call as failed.
649        """
650        crtfile_pieces = os.path.split(crtfile)
651        bad_crtfile = os.path.join(
652            crtfile_pieces[0], "deeply/nested", crtfile_pieces[1]
653        )
654        ret = self.run_function(
655            "state.apply",
656            ["x509.self_signed_file_error"],
657            pillar={"keyfile": keyfile, "crtfile": bad_crtfile},
658        )
659
660        key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(bad_crtfile)
661        self.assertFalse(ret[key]["result"], "State should have failed.")
662        self.assertEqual({}, ret[key]["changes"])
663        self.assertFalse(
664            os.path.exists(crtfile), "Certificate should not have been created."
665        )
666
667    @with_tempfile(suffix=".crt", create=False)
668    @with_tempfile(suffix=".key", create=False)
669    def test_py2_generated_cert_is_not_recreated(self, keyfile, crtfile):
670        keyfile_contents = textwrap.dedent(
671            """\
672        -----BEGIN RSA PRIVATE KEY-----
673        MIIEpAIBAAKCAQEAp5PQyx5NlYrfzd7vU/Xb2YR5qbWWtpWWoKmJC1gML5v5DBI7
674        +p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2Du7pP0xiCAYolhFqF78ibxNrN4OkT
675        UPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6wRzTj4T9b+0Bb/PZHI2t5YwtIooVM
676        EFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43Aj3Epy++kqmaWj1hIucSprkDrAXFS
677        WacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJkwjiqbZTwYIPSSrl+FO3XqDY70SxU
678        3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCBEQIDAQABAoIBAQCZvS23u1RYVrEe
679        sWGF+LA67aOkg9kCJ1iqiv8UrjF32DNy1KO8OcY2d5H/+u/mUzqh2HmU5QbtBsoi
680        xS9dSSTrLHGhbAGRogjrVRU9uCDYSBjLN2mmR4IrdkTF3pkZtpcRY0gU/eWTNXUl
681        iCmGxhj5KtfJxZQAfLon6FW5dBdIOgxSCJhvRq0zFpWJZFGWWkBExDfeNg//0fCU
682        UbjRjGacP/+R6FSJa6tevzgR7tIIapm1dY/ofPXIXsZGo1R87fRgLI1D+e84Jdds
683        /U0bKzPOgAjcC1b262lJ8058pjG/nqWC0YUfpIJUVv2ciJpH3Ha+90526InLAUXA
684        RWe1Z2YxAoGBANqACEKvUbxENu+XxQj0SI1co4SRTOvgbrSQGL61rDY6PvY/bOqC
685        JeR0KC3MN6e7fx52tsl/eqP9iyExUpO9b0BCnGg967MivJXWUxhUdOL/r2ceQBqD
686        DiPVZCFsjeNdSNihnNctAig9Po3GEUWE0ikHr3NcD+wXTnhnIEjJ/fltAoGBAMRW
687        dIcOiuDLm/oDLNCpwEO4m63ymbUgeOj2cZhKMTqFmspnKnuCU1U/A8cuQcs1gydL
688        7MzxVP7MZDIEqT5gGj3eyuVMAmKbvLFR2NctDIDjaUs6oz0J9NGByPNjXaYr4uMd
689        EZrxD8gLZ/G+/7eKsCgBA9ksSydDo00Vf/qAsmO1AoGBANWqc+l59eyrrCj5egU6
690        lKQf3gsp51WV/8v0SS5dC41vwdgdx80+/fz8FbpLRHVypWlN34sFbRFmQ6Juz/iH
691        O35UZQyO2KkxI8dGcbWOCUtditHExBzo4W/rIWKJ++pFc5Hb4DqO2dgto7kR4hvg
692        OX9D869UbIGLfQHCntBvLju1AoGAHpcl0sEmTD4NEFgcTGqWZTbHMsQAxOLJU+rJ
693        6iNtJiQY6P5H9TRqDXci/I6te57bz2yZ+ZiEWKq51b06LVjF3evviuhb2sdPEAWj
694        lmsTbqWAC1OYiXMarOXezGUn+zMNR7uIua5jehSk3lqW9x7psWHvGpA3KWf1cpYt
695        +XbB1J0CgYBCSjALTv4dcn+CtS3kqb806z8H9MSZznUwSmcgvwCR5sqwLAUk1xRn
696        hEqXbC1RGee3Xqv9mXPDK2LirpdRYi9Jr9ApZkrSkeaXSd2d4cy2ujUT0c7P8JrD
697        i6QXb+HaFeBuS5ulYDmo4mIbCysuTsgrLzplViUy3xUQv23M/Eh1gw==
698        -----END RSA PRIVATE KEY-----
699        """
700        )
701        crtfile_contents = textwrap.dedent(
702            """\
703        -----BEGIN CERTIFICATE-----
704        MIIEhTCCA22gAwIBAgIIUijHgif6VJUwDQYJKoZIhvcNAQELBQAwgYIxCzAJBgNV
705        BAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJvb3QgQ0ExETAPBgNVBAcMCEthcGVs
706        bGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYDVQQKDAdFeGFtcGxlMSIwIAYJKoZI
707        hvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3JnMB4XDTIwMDYxNjA3Mzk1OVoXDTMw
708        MDYxNDA3Mzk1OVowgYIxCzAJBgNVBAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJv
709        b3QgQ0ExETAPBgNVBAcMCEthcGVsbGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYD
710        VQQKDAdFeGFtcGxlMSIwIAYJKoZIhvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3Jn
711        MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp5PQyx5NlYrfzd7vU/Xb
712        2YR5qbWWtpWWoKmJC1gML5v5DBI7+p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2D
713        u7pP0xiCAYolhFqF78ibxNrN4OkTUPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6w
714        RzTj4T9b+0Bb/PZHI2t5YwtIooVMEFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43A
715        j3Epy++kqmaWj1hIucSprkDrAXFSWacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJk
716        wjiqbZTwYIPSSrl+FO3XqDY70SxU3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCB
717        EQIDAQABo4H8MIH5MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMB0G
718        A1UdDgQWBBTmNsYLuQTxpANgTuw7LRn1qHJsjzCBtgYDVR0jBIGuMIGrgBTmNsYL
719        uQTxpANgTuw7LRn1qHJsj6GBiKSBhTCBgjELMAkGA1UEBhMCQkUxGDAWBgNVBAMM
720        D0V4YW1wbGUgUm9vdCBDQTERMA8GA1UEBwwIS2FwZWxsZW4xEDAOBgNVBAgMB0Fu
721        dHdlcnAxEDAOBgNVBAoMB0V4YW1wbGUxIjAgBgkqhkiG9w0BCQEWE2NlcnRhZG1A
722        ZXhhbXBsZS5vcmeCCFIox4In+lSVMA0GCSqGSIb3DQEBCwUAA4IBAQBnC1/kK+xr
723        Vjr5Y2YRjyjm4e8I/nTU+RX2p5K+Yth3CqWO3JuDiV/31UMtPl832n2GWSgXG2pP
724        B52oeuCP4Re76jqhOmJWY3CKPji+Rs16wj199i9AAcwhSF0rpi5+Fi84HtP3q6pH
725        cuzZfIPW44aJ5l4k+QvTLoWzr0XujMFcYzI45i3SJqTMs8xdIP5YLN8JXtQSPw9Z
726        8/nBKbPj7WTUC9cj9Cw2bz+wTpdRF4XCsUF3Vpl9fP7SK8yvv0I85LZnWQx1eQlv
727        COAM5HWxUT9bWgv18zXdYkc6VLw6ufQSxxuhLMjJxuK27Ny/F18/xYLRTVnse36d
728        tPJrseUPmvIK
729        -----END CERTIFICATE-----
730        """
731        )
732        slsfile = textwrap.dedent(
733            """\
734        {%- set ca_key_path = '"""
735            + keyfile
736            + """' %}
737        {%- set ca_crt_path = '"""
738            + crtfile
739            + """' %}
740
741        certificate.authority::private-key:
742          x509.private_key_managed:
743            - name: {{ ca_key_path }}
744            - backup: True
745
746        certificate.authority::certificate:
747          x509.certificate_managed:
748            - name: {{ ca_crt_path }}
749            - signing_private_key: {{ ca_key_path }}
750            - CN: Example Root CA
751            - O: Example
752            - C: BE
753            - ST: Antwerp
754            - L: Kapellen
755            - Email: certadm@example.org
756            - basicConstraints: "critical CA:true"
757            - keyUsage: "critical cRLSign, keyCertSign"
758            - subjectKeyIdentifier: hash
759            - authorityKeyIdentifier: keyid,issuer:always
760            - days_valid: 3650
761            - days_remaining: 0
762            - backup: True
763            - require:
764              - x509: certificate.authority::private-key
765        """
766        )
767        with salt.utils.files.fopen(
768            os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "cert.sls"), "w"
769        ) as wfh:
770            wfh.write(slsfile)
771
772        # Generate the certificate twice.
773        # On the first run, no key nor cert exist.
774        ret = self.run_function("state.sls", ["cert"])
775        log.debug(
776            "First state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
777        )
778        for state_run_id, state_run_details in ret.items():
779            if state_run_id.endswith("private_key_managed"):
780                assert state_run_details["result"]
781                assert "new" in state_run_details["changes"]
782            if state_run_id.endswith("certificate_managed"):
783                assert state_run_details["result"]
784                assert "Certificate" in state_run_details["changes"]
785                assert "New" in state_run_details["changes"]["Certificate"]
786                assert "Status" in state_run_details["changes"]
787                assert "New" in state_run_details["changes"]["Status"]
788        # On the second run, they exist and should not trigger any modification
789        ret = self.run_function("state.sls", ["cert"])
790        log.debug(
791            "Second state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
792        )
793        for state_run_id, state_run_details in ret.items():
794            if state_run_id.endswith("private_key_managed"):
795                assert state_run_details["result"]
796                assert state_run_details["changes"] == {}
797            if state_run_id.endswith("certificate_managed"):
798                assert state_run_details["result"]
799                assert state_run_details["changes"] == {}
800        # Now we repleace they key and cert contents with the contents of the above
801        # call, but under Py2
802        with salt.utils.files.fopen(keyfile, "w") as wfh:
803            wfh.write(keyfile_contents)
804        with salt.utils.files.fopen(keyfile) as rfh:
805            log.debug("Written keyfile, %r, contents:\n%s", keyfile, rfh.read())
806        with salt.utils.files.fopen(crtfile, "w") as wfh:
807            wfh.write(crtfile_contents)
808        with salt.utils.files.fopen(crtfile) as rfh:
809            log.debug("Written crtfile, %r, contents:\n%s", crtfile, rfh.read())
810        # We should not trigger any modification
811        ret = self.run_function("state.sls", ["cert"])
812        log.debug(
813            "Third state run ret dictionary:\n%s", pprint.pformat(list(ret.values()))
814        )
815        for state_run_id, state_run_details in ret.items():
816            if state_run_id.endswith("private_key_managed"):
817                assert state_run_details["result"]
818                assert state_run_details["changes"] == {}
819            if state_run_id.endswith("certificate_managed"):
820                assert state_run_details["result"]
821                assert state_run_details["changes"] == {}
822