1import datetime 2import hashlib 3import logging 4import os 5import pprint 6import textwrap 7 8import pytest 9import salt.utils.files 10from tests.support.case import ModuleCase 11from tests.support.helpers import with_tempfile 12from tests.support.mixins import SaltReturnAssertsMixin 13from tests.support.runtests import RUNTIME_VARS 14from tests.support.unit import skipIf 15 16try: 17 import M2Crypto # pylint: disable=W0611 18 19 HAS_M2CRYPTO = True 20except ImportError: 21 HAS_M2CRYPTO = False 22 23log = logging.getLogger(__name__) 24 25 26@pytest.mark.usefixtures("salt_sub_minion") 27@skipIf(not HAS_M2CRYPTO, "Skip when no M2Crypto found") 28class x509Test(ModuleCase, SaltReturnAssertsMixin): 29 @classmethod 30 def setUpClass(cls): 31 cert_path = os.path.join(RUNTIME_VARS.BASE_FILES, "x509_test.crt") 32 with salt.utils.files.fopen(cert_path) as fp: 33 cls.x509_cert_text = fp.read() 34 35 def setUp(self): 36 with salt.utils.files.fopen( 37 os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls"), "w" 38 ) as fp: 39 fp.write( 40 textwrap.dedent( 41 """\ 42 x509_signing_policies: 43 ca_policy: 44 - minions: '*' 45 - signing_private_key: {0}/pki/ca.key 46 - signing_cert: {0}/pki/ca.crt 47 - O: Test Company 48 - basicConstraints: "CA:false" 49 - keyUsage: "critical digitalSignature, keyEncipherment" 50 - extendedKeyUsage: "critical serverAuth, clientAuth" 51 - subjectKeyIdentifier: hash 52 - authorityKeyIdentifier: keyid 53 - days_valid: 730 54 - copypath: {0}/pki 55 compound_match: 56 - minions: 'G@x509_test_grain:correct_value' 57 - signing_private_key: {0}/pki/ca.key 58 - signing_cert: {0}/pki/ca.crt 59 - O: Test Company 60 - basicConstraints: "CA:false" 61 - keyUsage: "critical digitalSignature, keyEncipherment" 62 - extendedKeyUsage: "critical serverAuth, clientAuth" 63 - subjectKeyIdentifier: hash 64 - authorityKeyIdentifier: keyid 65 - days_valid: 730 66 - copypath: {0}/pki 67 """.format( 68 RUNTIME_VARS.TMP 69 ) 70 ) 71 ) 72 with salt.utils.files.fopen( 73 os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls"), "w" 74 ) as fp: 75 fp.write( 76 textwrap.dedent( 77 """\ 78 base: 79 '*': 80 - signing_policies 81 """ 82 ) 83 ) 84 self.run_function("saltutil.refresh_pillar") 85 self.run_function( 86 "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="sub_minion" 87 ) 88 self.run_function( 89 "grains.set", ["x509_test_grain", "not_correct_value"], minion_tgt="minion" 90 ) 91 92 def tearDown(self): 93 os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "signing_policies.sls")) 94 os.remove(os.path.join(RUNTIME_VARS.TMP_PILLAR_TREE, "top.sls")) 95 certs_path = os.path.join(RUNTIME_VARS.TMP, "pki") 96 if os.path.exists(certs_path): 97 salt.utils.files.rm_rf(certs_path) 98 self.run_function("saltutil.refresh_pillar") 99 self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="sub_minion") 100 self.run_function("grains.delkey", ["x509_test_grain"], minion_tgt="minion") 101 102 def run_function(self, *args, **kwargs): # pylint: disable=arguments-differ 103 ret = super().run_function(*args, **kwargs) 104 return ret 105 106 @staticmethod 107 def file_checksum(path): 108 hash = hashlib.sha1() 109 with salt.utils.files.fopen(path, "rb") as f: 110 for block in iter(lambda: f.read(4096), b""): 111 hash.update(block) 112 return hash.hexdigest() 113 114 @with_tempfile(suffix=".pem", create=False) 115 @pytest.mark.slow_test 116 def test_issue_49027(self, pemfile): 117 ret = self.run_state("x509.pem_managed", name=pemfile, text=self.x509_cert_text) 118 assert isinstance(ret, dict), ret 119 ret = ret[next(iter(ret))] 120 assert ret.get("result") is True, ret 121 with salt.utils.files.fopen(pemfile) as fp: 122 result = fp.readlines() 123 self.assertEqual(self.x509_cert_text.splitlines(True), result) 124 125 @with_tempfile(suffix=".crt", create=False) 126 @with_tempfile(suffix=".key", create=False) 127 @pytest.mark.slow_test 128 def test_issue_49008(self, keyfile, crtfile): 129 ret = self.run_function( 130 "state.apply", 131 ["issue-49008"], 132 pillar={"keyfile": keyfile, "crtfile": crtfile}, 133 ) 134 assert isinstance(ret, dict), ret 135 for state_result in ret.values(): 136 assert state_result["result"] is True, state_result 137 assert os.path.exists(keyfile) 138 assert os.path.exists(crtfile) 139 140 @pytest.mark.slow_test 141 def test_cert_signing(self): 142 ret = self.run_function( 143 "state.apply", ["x509.cert_signing"], pillar={"tmp_dir": RUNTIME_VARS.TMP} 144 ) 145 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format( 146 RUNTIME_VARS.TMP 147 ) 148 assert key in ret 149 assert "changes" in ret[key] 150 assert "Certificate" in ret[key]["changes"] 151 assert "New" in ret[key]["changes"]["Certificate"] 152 153 @pytest.mark.slow_test 154 def test_cert_signing_based_on_csr(self): 155 ret = self.run_function( 156 "state.apply", 157 ["x509.cert_signing_based_on_csr"], 158 pillar={"tmp_dir": RUNTIME_VARS.TMP}, 159 ) 160 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format( 161 RUNTIME_VARS.TMP 162 ) 163 assert key in ret 164 assert "changes" in ret[key] 165 assert "Certificate" in ret[key]["changes"] 166 assert "New" in ret[key]["changes"]["Certificate"] 167 168 @pytest.mark.slow_test 169 def test_proper_cert_comparison(self): 170 # In this SLS we define two certs which have identical content. 171 # The first one is expected to be created. 172 # The second one is expected to be recognized as already present. 173 ret = self.run_function( 174 "state.apply", 175 ["x509.proper_cert_comparison"], 176 pillar={"tmp_dir": RUNTIME_VARS.TMP}, 177 ) 178 # check the first generated cert 179 first_key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format( 180 RUNTIME_VARS.TMP 181 ) 182 assert first_key in ret 183 assert "changes" in ret[first_key] 184 assert "Certificate" in ret[first_key]["changes"] 185 assert "New" in ret[first_key]["changes"]["Certificate"] 186 # check whether the second defined cert is considered to match the first one 187 second_key = ( 188 "x509_|-second_test_crt_|-{}/pki/test.crt_|-certificate_managed".format( 189 RUNTIME_VARS.TMP 190 ) 191 ) 192 assert second_key in ret 193 assert "changes" in ret[second_key] 194 assert ret[second_key]["changes"] == {} 195 196 @pytest.mark.slow_test 197 def test_crl_managed(self): 198 ret = self.run_function( 199 "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP} 200 ) 201 key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format( 202 RUNTIME_VARS.TMP, RUNTIME_VARS.TMP 203 ) 204 205 # hints for easier debugging 206 # import json 207 # print(json.dumps(ret[key], indent=4, sort_keys=True)) 208 # print(ret[key]['comment']) 209 210 assert key in ret 211 assert "changes" in ret[key] 212 self.assertEqual(ret[key]["result"], True) 213 assert "New" in ret[key]["changes"] 214 assert "Revoked Certificates" in ret[key]["changes"]["New"] 215 self.assertEqual( 216 ret[key]["changes"]["Old"], 217 "{}/pki/ca.crl does not exist.".format(RUNTIME_VARS.TMP), 218 ) 219 220 @pytest.mark.slow_test 221 def test_crl_managed_replacing_existing_crl(self): 222 os.mkdir(os.path.join(RUNTIME_VARS.TMP, "pki")) 223 with salt.utils.files.fopen( 224 os.path.join(RUNTIME_VARS.TMP, "pki/ca.crl"), "wb" 225 ) as crl_file: 226 crl_file.write( 227 b"""-----BEGIN RSA PRIVATE KEY----- 228MIICWwIBAAKBgQCjdjbgL4kQ8Lu73xeRRM1q3C3K3ptfCLpyfw38LRnymxaoJ6ls 229pNSx2dU1uJ89YKFlYLo1QcEk4rJ2fdIjarV0kuNCY3rC8jYUp9BpAU5Z6p9HKeT1 2302rTPH81JyjbQDR5PyfCyzYOQtpwpB4zIUUK/Go7tTm409xGKbbUFugJNgQIDAQAB 231AoGAF24we34U1ZrMLifSRv5nu3OIFNZHyx2DLDpOFOGaII5edwgIXwxZeIzS5Ppr 232yO568/8jcdLVDqZ4EkgCwRTgoXRq3a1GLHGFmBdDNvWjSTTMLoozuM0t2zjRmIsH 233hUd7tnai9Lf1Bp5HlBEhBU2gZWk+SXqLvxXe74/+BDAj7gECQQDRw1OPsrgTvs3R 2343MNwX6W8+iBYMTGjn6f/6rvEzUs/k6rwJluV7n8ISNUIAxoPy5g5vEYK6Ln/Ttc7 235u0K1KNlRAkEAx34qcxjuswavL3biNGE+8LpDJnJx1jaNWoH+ObuzYCCVMusdT2gy 236kKuq9ytTDgXd2qwZpIDNmscvReFy10glMQJAXebMz3U4Bk7SIHJtYy7OKQzn0dMj 23735WnRV81c2Jbnzhhu2PQeAvt/i1sgEuzLQL9QEtSJ6wLJ4mJvImV0TdaIQJAAYyk 238TcKK0A8kOy0kMp3yvDHmJZ1L7wr7bBGIZPBlQ0Ddh8i1sJExm1gJ+uN2QKyg/XrK 239tDFf52zWnCdVGgDwcQJALW/WcbSEK+JVV6KDJYpwCzWpKIKpBI0F6fdCr1G7Xcwj 240c9bcgp7D7xD+TxWWNj4CSXEccJgGr91StV+gFg4ARQ== 241-----END RSA PRIVATE KEY----- 242""" 243 ) 244 245 ret = self.run_function( 246 "state.apply", ["x509.crl_managed"], pillar={"tmp_dir": RUNTIME_VARS.TMP} 247 ) 248 key = "x509_|-{}/pki/ca.crl_|-{}/pki/ca.crl_|-crl_managed".format( 249 RUNTIME_VARS.TMP, RUNTIME_VARS.TMP 250 ) 251 252 # hints for easier debugging 253 # import json 254 # print(json.dumps(ret[key], indent=4, sort_keys=True)) 255 # print(ret[key]['comment']) 256 257 assert key in ret 258 assert "changes" in ret[key] 259 self.assertEqual(ret[key]["result"], True) 260 assert "New" in ret[key]["changes"] 261 assert "Revoked Certificates" in ret[key]["changes"]["New"] 262 self.assertEqual( 263 ret[key]["changes"]["Old"], 264 "{}/pki/ca.crl is not a valid CRL.".format(RUNTIME_VARS.TMP), 265 ) 266 267 def test_cert_issue_not_before_not_after(self): 268 ret = self.run_function( 269 "state.apply", 270 ["test_cert_not_before_not_after"], 271 pillar={"tmp_dir": RUNTIME_VARS.TMP}, 272 ) 273 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format( 274 RUNTIME_VARS.TMP 275 ) 276 assert key in ret 277 assert "changes" in ret[key] 278 assert "Certificate" in ret[key]["changes"] 279 assert "New" in ret[key]["changes"]["Certificate"] 280 assert "Not Before" in ret[key]["changes"]["Certificate"]["New"] 281 assert "Not After" in ret[key]["changes"]["Certificate"]["New"] 282 not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"] 283 not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"] 284 assert not_before == "2019-05-05 00:00:00" 285 assert not_after == "2020-05-05 14:30:00" 286 287 def test_cert_issue_not_before(self): 288 ret = self.run_function( 289 "state.apply", 290 ["test_cert_not_before"], 291 pillar={"tmp_dir": RUNTIME_VARS.TMP}, 292 ) 293 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format( 294 RUNTIME_VARS.TMP 295 ) 296 assert key in ret 297 assert "changes" in ret[key] 298 assert "Certificate" in ret[key]["changes"] 299 assert "New" in ret[key]["changes"]["Certificate"] 300 assert "Not Before" in ret[key]["changes"]["Certificate"]["New"] 301 assert "Not After" in ret[key]["changes"]["Certificate"]["New"] 302 not_before = ret[key]["changes"]["Certificate"]["New"]["Not Before"] 303 assert not_before == "2019-05-05 00:00:00" 304 305 def test_cert_issue_not_after(self): 306 ret = self.run_function( 307 "state.apply", ["test_cert_not_after"], pillar={"tmp_dir": RUNTIME_VARS.TMP} 308 ) 309 key = "x509_|-test_crt_|-{}/pki/test.crt_|-certificate_managed".format( 310 RUNTIME_VARS.TMP 311 ) 312 assert key in ret 313 assert "changes" in ret[key] 314 assert "Certificate" in ret[key]["changes"] 315 assert "New" in ret[key]["changes"]["Certificate"] 316 assert "Not Before" in ret[key]["changes"]["Certificate"]["New"] 317 assert "Not After" in ret[key]["changes"]["Certificate"]["New"] 318 not_after = ret[key]["changes"]["Certificate"]["New"]["Not After"] 319 assert not_after == "2020-05-05 14:30:00" 320 321 @with_tempfile(suffix=".crt", create=False) 322 @with_tempfile(suffix=".key", create=False) 323 def test_issue_41858(self, keyfile, crtfile): 324 ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile) 325 signing_policy = "no_such_policy" 326 ret = self.run_function( 327 "state.apply", 328 ["issue-41858.gen_cert"], 329 pillar={ 330 "keyfile": keyfile, 331 "crtfile": crtfile, 332 "tmp_dir": RUNTIME_VARS.TMP, 333 }, 334 ) 335 self.assertTrue(ret[ret_key]["result"]) 336 cert_sum = self.file_checksum(crtfile) 337 338 ret = self.run_function( 339 "state.apply", 340 ["issue-41858.check"], 341 pillar={ 342 "keyfile": keyfile, 343 "crtfile": crtfile, 344 "signing_policy": signing_policy, 345 }, 346 ) 347 self.assertFalse(ret[ret_key]["result"]) 348 # self.assertSaltCommentRegexpMatches(ret[ret_key], "Signing policy {0} does not exist".format(signing_policy)) 349 self.assertEqual(self.file_checksum(crtfile), cert_sum) 350 351 @with_tempfile(suffix=".crt", create=False) 352 @with_tempfile(suffix=".key", create=False) 353 def test_compound_match_minion_have_correct_grain_value(self, keyfile, crtfile): 354 ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile) 355 signing_policy = "compound_match" 356 ret = self.run_function( 357 "state.apply", 358 ["x509_compound_match.gen_ca"], 359 pillar={"tmp_dir": RUNTIME_VARS.TMP}, 360 ) 361 362 # sub_minion have grain set and CA is on other minion 363 # CA minion have same grain with incorrect value 364 ret = self.run_function( 365 "state.apply", 366 ["x509_compound_match.check"], 367 minion_tgt="sub_minion", 368 pillar={ 369 "keyfile": keyfile, 370 "crtfile": crtfile, 371 "signing_policy": signing_policy, 372 }, 373 ) 374 self.assertTrue(ret[ret_key]["result"]) 375 376 @with_tempfile(suffix=".crt", create=False) 377 @with_tempfile(suffix=".key", create=False) 378 def test_compound_match_ca_have_correct_grain_value(self, keyfile, crtfile): 379 self.run_function( 380 "grains.set", ["x509_test_grain", "correct_value"], minion_tgt="minion" 381 ) 382 self.run_function( 383 "grains.set", 384 ["x509_test_grain", "not_correct_value"], 385 minion_tgt="sub_minion", 386 ) 387 388 ret_key = "x509_|-test_crt_|-{}_|-certificate_managed".format(crtfile) 389 signing_policy = "compound_match" 390 self.run_function( 391 "state.apply", 392 ["x509_compound_match.gen_ca"], 393 pillar={"tmp_dir": RUNTIME_VARS.TMP}, 394 ) 395 396 ret = self.run_function( 397 "state.apply", 398 ["x509_compound_match.check"], 399 minion_tgt="sub_minion", 400 pillar={ 401 "keyfile": keyfile, 402 "crtfile": crtfile, 403 "signing_policy": signing_policy, 404 }, 405 ) 406 self.assertFalse(ret[ret_key]["result"]) 407 408 @with_tempfile(suffix=".crt", create=False) 409 @with_tempfile(suffix=".key", create=False) 410 def test_self_signed_cert(self, keyfile, crtfile): 411 """ 412 Self-signed certificate, no CA. 413 Run the state twice to confirm the cert is only created once 414 and its contents don't change. 415 """ 416 first_run = self.run_function( 417 "state.apply", 418 ["x509.self_signed"], 419 pillar={"keyfile": keyfile, "crtfile": crtfile}, 420 ) 421 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile) 422 self.assertIn("New", first_run[key]["changes"]["Certificate"]) 423 self.assertEqual( 424 "Certificate is valid and up to date", 425 first_run[key]["changes"]["Status"]["New"], 426 ) 427 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.") 428 429 with salt.utils.files.fopen(crtfile, "r") as first_cert: 430 cert_contents = first_cert.read() 431 432 second_run = self.run_function( 433 "state.apply", 434 ["x509.self_signed"], 435 pillar={"keyfile": keyfile, "crtfile": crtfile}, 436 ) 437 self.assertEqual({}, second_run[key]["changes"]) 438 with salt.utils.files.fopen(crtfile, "r") as second_cert: 439 self.assertEqual( 440 cert_contents, 441 second_cert.read(), 442 "Certificate contents should not have changed.", 443 ) 444 445 @with_tempfile(suffix=".crt", create=False) 446 @with_tempfile(suffix=".key", create=False) 447 def test_old_self_signed_cert_is_recreated(self, keyfile, crtfile): 448 """ 449 Self-signed certificate, no CA. 450 First create a cert that expires in 30 days, then recreate 451 the cert because the second state run requires days_remaining 452 to be at least 90. 453 """ 454 first_run = self.run_function( 455 "state.apply", 456 ["x509.self_signed_expiry"], 457 pillar={ 458 "keyfile": keyfile, 459 "crtfile": crtfile, 460 "days_valid": 30, 461 "days_remaining": 10, 462 }, 463 ) 464 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile) 465 self.assertEqual( 466 "Certificate is valid and up to date", 467 first_run[key]["changes"]["Status"]["New"], 468 ) 469 expiry = datetime.datetime.strptime( 470 first_run[key]["changes"]["Certificate"]["New"]["Not After"], 471 "%Y-%m-%d %H:%M:%S", 472 ) 473 self.assertEqual(29, (expiry - datetime.datetime.now()).days) 474 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.") 475 476 with salt.utils.files.fopen(crtfile, "r") as first_cert: 477 cert_contents = first_cert.read() 478 479 second_run = self.run_function( 480 "state.apply", 481 ["x509.self_signed_expiry"], 482 pillar={ 483 "keyfile": keyfile, 484 "crtfile": crtfile, 485 "days_valid": 180, 486 "days_remaining": 90, 487 }, 488 ) 489 self.assertEqual( 490 "Certificate needs renewal: 29 days remaining but it needs to be at" 491 " least 90", 492 second_run[key]["changes"]["Status"]["Old"], 493 ) 494 expiry = datetime.datetime.strptime( 495 second_run[key]["changes"]["Certificate"]["New"]["Not After"], 496 "%Y-%m-%d %H:%M:%S", 497 ) 498 self.assertEqual(179, (expiry - datetime.datetime.now()).days) 499 with salt.utils.files.fopen(crtfile, "r") as second_cert: 500 self.assertNotEqual( 501 cert_contents, 502 second_cert.read(), 503 "Certificate contents should have changed.", 504 ) 505 506 @with_tempfile(suffix=".crt", create=False) 507 @with_tempfile(suffix=".key", create=False) 508 def test_mismatched_self_signed_cert_is_recreated(self, keyfile, crtfile): 509 """ 510 Self-signed certificate, no CA. 511 First create a cert, then run the state again with a different 512 subjectAltName. The cert should be recreated. 513 Finally, run once more with the same subjectAltName as the 514 second run. Nothing should change. 515 """ 516 first_run = self.run_function( 517 "state.apply", 518 ["x509.self_signed_different_properties"], 519 pillar={ 520 "keyfile": keyfile, 521 "crtfile": crtfile, 522 "subjectAltName": "DNS:alt.service.local", 523 }, 524 ) 525 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile) 526 self.assertEqual( 527 "Certificate is valid and up to date", 528 first_run[key]["changes"]["Status"]["New"], 529 ) 530 sans = first_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][ 531 "subjectAltName" 532 ] 533 self.assertEqual("DNS:alt.service.local", sans) 534 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.") 535 536 with salt.utils.files.fopen(crtfile, "r") as first_cert: 537 first_cert_contents = first_cert.read() 538 539 second_run_pillar = { 540 "keyfile": keyfile, 541 "crtfile": crtfile, 542 "subjectAltName": "DNS:alt1.service.local, DNS:alt2.service.local", 543 } 544 second_run = self.run_function( 545 "state.apply", 546 ["x509.self_signed_different_properties"], 547 pillar=second_run_pillar, 548 ) 549 self.assertEqual( 550 "Certificate properties are different: X509v3 Extensions", 551 second_run[key]["changes"]["Status"]["Old"], 552 ) 553 sans = second_run[key]["changes"]["Certificate"]["New"]["X509v3 Extensions"][ 554 "subjectAltName" 555 ] 556 self.assertEqual("DNS:alt1.service.local, DNS:alt2.service.local", sans) 557 with salt.utils.files.fopen(crtfile, "r") as second_cert: 558 second_cert_contents = second_cert.read() 559 self.assertNotEqual( 560 first_cert_contents, 561 second_cert_contents, 562 "Certificate contents should have changed.", 563 ) 564 565 third_run = self.run_function( 566 "state.apply", 567 ["x509.self_signed_different_properties"], 568 pillar=second_run_pillar, 569 ) 570 self.assertEqual({}, third_run[key]["changes"]) 571 with salt.utils.files.fopen(crtfile, "r") as third_cert: 572 self.assertEqual( 573 second_cert_contents, 574 third_cert.read(), 575 "Certificate contents should not have changed.", 576 ) 577 578 @with_tempfile(suffix=".crt", create=False) 579 @with_tempfile(suffix=".key", create=False) 580 def test_certificate_managed_with_managed_private_key_does_not_error( 581 self, keyfile, crtfile 582 ): 583 """ 584 Test using the deprecated managed_private_key arg in certificate_managed does not throw an error. 585 586 TODO: Remove this test in Aluminium when the arg is removed. 587 """ 588 self.run_state("x509.private_key_managed", name=keyfile, bits=4096) 589 ret = self.run_state( 590 "x509.certificate_managed", 591 name=crtfile, 592 CN="localhost", 593 signing_private_key=keyfile, 594 managed_private_key={"name": keyfile, "bits": 4096}, 595 ) 596 key = "x509_|-{0}_|-{0}_|-certificate_managed".format(crtfile) 597 self.assertEqual(True, ret[key]["result"]) 598 599 @with_tempfile(suffix=".crt", create=False) 600 @with_tempfile(suffix=".key", create=False) 601 def test_file_properties_are_updated(self, keyfile, crtfile): 602 """ 603 Self-signed certificate, no CA. 604 First create a cert, then run the state again with different 605 file mode. The cert should not be recreated, but the file 606 should be updated. 607 Finally, run once more with the same file mode as the second 608 run. Nothing should change. 609 """ 610 first_run = self.run_function( 611 "state.apply", 612 ["x509.self_signed_different_properties"], 613 pillar={"keyfile": keyfile, "crtfile": crtfile, "fileMode": "0755"}, 614 ) 615 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(crtfile) 616 self.assertEqual( 617 "Certificate is valid and up to date", 618 first_run[key]["changes"]["Status"]["New"], 619 ) 620 self.assertTrue(os.path.exists(crtfile), "Certificate was not created.") 621 self.assertEqual("0755", oct(os.stat(crtfile).st_mode)[-4:]) 622 623 second_run_pillar = { 624 "keyfile": keyfile, 625 "crtfile": crtfile, 626 "mode": "0600", 627 } 628 second_run = self.run_function( 629 "state.apply", 630 ["x509.self_signed_different_properties"], 631 pillar=second_run_pillar, 632 ) 633 self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:]) 634 635 third_run = self.run_function( 636 "state.apply", 637 ["x509.self_signed_different_properties"], 638 pillar=second_run_pillar, 639 ) 640 self.assertEqual({}, third_run[key]["changes"]) 641 self.assertEqual("0600", oct(os.stat(crtfile).st_mode)[-4:]) 642 643 @with_tempfile(suffix=".crt", create=False) 644 @with_tempfile(suffix=".key", create=False) 645 def test_file_managed_failure(self, keyfile, crtfile): 646 """ 647 Test that a failure in the file.managed call marks the state 648 call as failed. 649 """ 650 crtfile_pieces = os.path.split(crtfile) 651 bad_crtfile = os.path.join( 652 crtfile_pieces[0], "deeply/nested", crtfile_pieces[1] 653 ) 654 ret = self.run_function( 655 "state.apply", 656 ["x509.self_signed_file_error"], 657 pillar={"keyfile": keyfile, "crtfile": bad_crtfile}, 658 ) 659 660 key = "x509_|-self_signed_cert_|-{}_|-certificate_managed".format(bad_crtfile) 661 self.assertFalse(ret[key]["result"], "State should have failed.") 662 self.assertEqual({}, ret[key]["changes"]) 663 self.assertFalse( 664 os.path.exists(crtfile), "Certificate should not have been created." 665 ) 666 667 @with_tempfile(suffix=".crt", create=False) 668 @with_tempfile(suffix=".key", create=False) 669 def test_py2_generated_cert_is_not_recreated(self, keyfile, crtfile): 670 keyfile_contents = textwrap.dedent( 671 """\ 672 -----BEGIN RSA PRIVATE KEY----- 673 MIIEpAIBAAKCAQEAp5PQyx5NlYrfzd7vU/Xb2YR5qbWWtpWWoKmJC1gML5v5DBI7 674 +p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2Du7pP0xiCAYolhFqF78ibxNrN4OkT 675 UPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6wRzTj4T9b+0Bb/PZHI2t5YwtIooVM 676 EFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43Aj3Epy++kqmaWj1hIucSprkDrAXFS 677 WacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJkwjiqbZTwYIPSSrl+FO3XqDY70SxU 678 3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCBEQIDAQABAoIBAQCZvS23u1RYVrEe 679 sWGF+LA67aOkg9kCJ1iqiv8UrjF32DNy1KO8OcY2d5H/+u/mUzqh2HmU5QbtBsoi 680 xS9dSSTrLHGhbAGRogjrVRU9uCDYSBjLN2mmR4IrdkTF3pkZtpcRY0gU/eWTNXUl 681 iCmGxhj5KtfJxZQAfLon6FW5dBdIOgxSCJhvRq0zFpWJZFGWWkBExDfeNg//0fCU 682 UbjRjGacP/+R6FSJa6tevzgR7tIIapm1dY/ofPXIXsZGo1R87fRgLI1D+e84Jdds 683 /U0bKzPOgAjcC1b262lJ8058pjG/nqWC0YUfpIJUVv2ciJpH3Ha+90526InLAUXA 684 RWe1Z2YxAoGBANqACEKvUbxENu+XxQj0SI1co4SRTOvgbrSQGL61rDY6PvY/bOqC 685 JeR0KC3MN6e7fx52tsl/eqP9iyExUpO9b0BCnGg967MivJXWUxhUdOL/r2ceQBqD 686 DiPVZCFsjeNdSNihnNctAig9Po3GEUWE0ikHr3NcD+wXTnhnIEjJ/fltAoGBAMRW 687 dIcOiuDLm/oDLNCpwEO4m63ymbUgeOj2cZhKMTqFmspnKnuCU1U/A8cuQcs1gydL 688 7MzxVP7MZDIEqT5gGj3eyuVMAmKbvLFR2NctDIDjaUs6oz0J9NGByPNjXaYr4uMd 689 EZrxD8gLZ/G+/7eKsCgBA9ksSydDo00Vf/qAsmO1AoGBANWqc+l59eyrrCj5egU6 690 lKQf3gsp51WV/8v0SS5dC41vwdgdx80+/fz8FbpLRHVypWlN34sFbRFmQ6Juz/iH 691 O35UZQyO2KkxI8dGcbWOCUtditHExBzo4W/rIWKJ++pFc5Hb4DqO2dgto7kR4hvg 692 OX9D869UbIGLfQHCntBvLju1AoGAHpcl0sEmTD4NEFgcTGqWZTbHMsQAxOLJU+rJ 693 6iNtJiQY6P5H9TRqDXci/I6te57bz2yZ+ZiEWKq51b06LVjF3evviuhb2sdPEAWj 694 lmsTbqWAC1OYiXMarOXezGUn+zMNR7uIua5jehSk3lqW9x7psWHvGpA3KWf1cpYt 695 +XbB1J0CgYBCSjALTv4dcn+CtS3kqb806z8H9MSZznUwSmcgvwCR5sqwLAUk1xRn 696 hEqXbC1RGee3Xqv9mXPDK2LirpdRYi9Jr9ApZkrSkeaXSd2d4cy2ujUT0c7P8JrD 697 i6QXb+HaFeBuS5ulYDmo4mIbCysuTsgrLzplViUy3xUQv23M/Eh1gw== 698 -----END RSA PRIVATE KEY----- 699 """ 700 ) 701 crtfile_contents = textwrap.dedent( 702 """\ 703 -----BEGIN CERTIFICATE----- 704 MIIEhTCCA22gAwIBAgIIUijHgif6VJUwDQYJKoZIhvcNAQELBQAwgYIxCzAJBgNV 705 BAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJvb3QgQ0ExETAPBgNVBAcMCEthcGVs 706 bGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYDVQQKDAdFeGFtcGxlMSIwIAYJKoZI 707 hvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3JnMB4XDTIwMDYxNjA3Mzk1OVoXDTMw 708 MDYxNDA3Mzk1OVowgYIxCzAJBgNVBAYTAkJFMRgwFgYDVQQDDA9FeGFtcGxlIFJv 709 b3QgQ0ExETAPBgNVBAcMCEthcGVsbGVuMRAwDgYDVQQIDAdBbnR3ZXJwMRAwDgYD 710 VQQKDAdFeGFtcGxlMSIwIAYJKoZIhvcNAQkBFhNjZXJ0YWRtQGV4YW1wbGUub3Jn 711 MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp5PQyx5NlYrfzd7vU/Xb 712 2YR5qbWWtpWWoKmJC1gML5v5DBI7+p/kAHNNmK8uqHXTaI4N/zgarfjrg4zceq2D 713 u7pP0xiCAYolhFqF78ibxNrN4OkTUPm2kM88iJ8Z14Yph8ueSxLIlujCGaEFhr6w 714 RzTj4T9b+0Bb/PZHI2t5YwtIooVMEFCBFkt4bb004tO0D9q0CPPVT2AsGmxnY43A 715 j3Epy++kqmaWj1hIucSprkDrAXFSWacBQPFQ8XctnL2Z1Q6CJ5WUNrW8ohAJ9RJk 716 wjiqbZTwYIPSSrl+FO3XqDY70SxU3xDeqhU4zvyjxJ8w9SPqTUu/C3BZtRBT9dCB 717 EQIDAQABo4H8MIH5MA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgEGMB0G 718 A1UdDgQWBBTmNsYLuQTxpANgTuw7LRn1qHJsjzCBtgYDVR0jBIGuMIGrgBTmNsYL 719 uQTxpANgTuw7LRn1qHJsj6GBiKSBhTCBgjELMAkGA1UEBhMCQkUxGDAWBgNVBAMM 720 D0V4YW1wbGUgUm9vdCBDQTERMA8GA1UEBwwIS2FwZWxsZW4xEDAOBgNVBAgMB0Fu 721 dHdlcnAxEDAOBgNVBAoMB0V4YW1wbGUxIjAgBgkqhkiG9w0BCQEWE2NlcnRhZG1A 722 ZXhhbXBsZS5vcmeCCFIox4In+lSVMA0GCSqGSIb3DQEBCwUAA4IBAQBnC1/kK+xr 723 Vjr5Y2YRjyjm4e8I/nTU+RX2p5K+Yth3CqWO3JuDiV/31UMtPl832n2GWSgXG2pP 724 B52oeuCP4Re76jqhOmJWY3CKPji+Rs16wj199i9AAcwhSF0rpi5+Fi84HtP3q6pH 725 cuzZfIPW44aJ5l4k+QvTLoWzr0XujMFcYzI45i3SJqTMs8xdIP5YLN8JXtQSPw9Z 726 8/nBKbPj7WTUC9cj9Cw2bz+wTpdRF4XCsUF3Vpl9fP7SK8yvv0I85LZnWQx1eQlv 727 COAM5HWxUT9bWgv18zXdYkc6VLw6ufQSxxuhLMjJxuK27Ny/F18/xYLRTVnse36d 728 tPJrseUPmvIK 729 -----END CERTIFICATE----- 730 """ 731 ) 732 slsfile = textwrap.dedent( 733 """\ 734 {%- set ca_key_path = '""" 735 + keyfile 736 + """' %} 737 {%- set ca_crt_path = '""" 738 + crtfile 739 + """' %} 740 741 certificate.authority::private-key: 742 x509.private_key_managed: 743 - name: {{ ca_key_path }} 744 - backup: True 745 746 certificate.authority::certificate: 747 x509.certificate_managed: 748 - name: {{ ca_crt_path }} 749 - signing_private_key: {{ ca_key_path }} 750 - CN: Example Root CA 751 - O: Example 752 - C: BE 753 - ST: Antwerp 754 - L: Kapellen 755 - Email: certadm@example.org 756 - basicConstraints: "critical CA:true" 757 - keyUsage: "critical cRLSign, keyCertSign" 758 - subjectKeyIdentifier: hash 759 - authorityKeyIdentifier: keyid,issuer:always 760 - days_valid: 3650 761 - days_remaining: 0 762 - backup: True 763 - require: 764 - x509: certificate.authority::private-key 765 """ 766 ) 767 with salt.utils.files.fopen( 768 os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "cert.sls"), "w" 769 ) as wfh: 770 wfh.write(slsfile) 771 772 # Generate the certificate twice. 773 # On the first run, no key nor cert exist. 774 ret = self.run_function("state.sls", ["cert"]) 775 log.debug( 776 "First state run ret dictionary:\n%s", pprint.pformat(list(ret.values())) 777 ) 778 for state_run_id, state_run_details in ret.items(): 779 if state_run_id.endswith("private_key_managed"): 780 assert state_run_details["result"] 781 assert "new" in state_run_details["changes"] 782 if state_run_id.endswith("certificate_managed"): 783 assert state_run_details["result"] 784 assert "Certificate" in state_run_details["changes"] 785 assert "New" in state_run_details["changes"]["Certificate"] 786 assert "Status" in state_run_details["changes"] 787 assert "New" in state_run_details["changes"]["Status"] 788 # On the second run, they exist and should not trigger any modification 789 ret = self.run_function("state.sls", ["cert"]) 790 log.debug( 791 "Second state run ret dictionary:\n%s", pprint.pformat(list(ret.values())) 792 ) 793 for state_run_id, state_run_details in ret.items(): 794 if state_run_id.endswith("private_key_managed"): 795 assert state_run_details["result"] 796 assert state_run_details["changes"] == {} 797 if state_run_id.endswith("certificate_managed"): 798 assert state_run_details["result"] 799 assert state_run_details["changes"] == {} 800 # Now we repleace they key and cert contents with the contents of the above 801 # call, but under Py2 802 with salt.utils.files.fopen(keyfile, "w") as wfh: 803 wfh.write(keyfile_contents) 804 with salt.utils.files.fopen(keyfile) as rfh: 805 log.debug("Written keyfile, %r, contents:\n%s", keyfile, rfh.read()) 806 with salt.utils.files.fopen(crtfile, "w") as wfh: 807 wfh.write(crtfile_contents) 808 with salt.utils.files.fopen(crtfile) as rfh: 809 log.debug("Written crtfile, %r, contents:\n%s", crtfile, rfh.read()) 810 # We should not trigger any modification 811 ret = self.run_function("state.sls", ["cert"]) 812 log.debug( 813 "Third state run ret dictionary:\n%s", pprint.pformat(list(ret.values())) 814 ) 815 for state_run_id, state_run_details in ret.items(): 816 if state_run_id.endswith("private_key_managed"): 817 assert state_run_details["result"] 818 assert state_run_details["changes"] == {} 819 if state_run_id.endswith("certificate_managed"): 820 assert state_run_details["result"] 821 assert state_run_details["changes"] == {} 822