1# ------------------------------------ 2# Copyright (c) Microsoft Corporation. 3# Licensed under the MIT License. 4# ------------------------------------ 5import asyncio 6import base64 7import os 8from azure.identity.aio import DefaultAzureCredential 9from azure.keyvault.certificates.aio import CertificateClient 10from azure.keyvault.certificates import CertificatePolicy 11from azure.keyvault.secrets.aio import SecretClient 12from azure.core.exceptions import HttpResponseError 13from cryptography.hazmat.primitives.serialization import pkcs12 14 15# ---------------------------------------------------------------------------------------------------------- 16# Prerequisites: 17# 1. An Azure Key Vault. (https://docs.microsoft.com/en-us/azure/key-vault/quick-create-cli) 18# 19# 2. A service principal with certificate get, delete, and purge permissions, as well as secret get 20# permissions. 21# 22# 3. azure-keyvault-certificates, azure-keyvault-secrets, azure-identity, and cryptography (v3.3+) packages 23# (pip install these). 24# 25# 4. Set Environment variables AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, VAULT_URL. (See 26# https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-certificates#authenticate-the-client) 27# 28# ---------------------------------------------------------------------------------------------------------- 29# Sample - demonstrates how to get the private key of an existing Key Vault certificate 30# 31# 1. Create a new certificate (CertificateClient.create_certificate) 32# 33# 2. Get a certificate secret (SecretClient.get_secret) 34# 35# 3. Delete a certificate (CertificateClient.delete_certificate) 36# 37# 4. Purge a certificate (CertificateClient.purge_deleted_secret) 38# 39# ---------------------------------------------------------------------------------------------------------- 40 41async def run_sample(): 42 # Instantiate a certificate client that will be used to call the service. 43 # Notice that the client is using default Azure credentials. 44 # To make default credentials work, ensure that environment variables 'AZURE_CLIENT_ID', 45 # 'AZURE_CLIENT_SECRET' and 'AZURE_TENANT_ID' are set with the service principal credentials. 46 VAULT_URL = os.environ["VAULT_URL"] 47 credential = DefaultAzureCredential() 48 certificate_client = CertificateClient(vault_url=VAULT_URL, credential=credential) 49 50 # Instantiate a secret client that will be used to call the service. 51 # Notice that this client can reuse the credential object created above. 52 secret_client = SecretClient(vault_url=VAULT_URL, credential=credential) 53 try: 54 # Let's create a certificate in the vault. 55 # If the certificate already exists in the Key Vault, then a new version of the certificate is created. 56 print("\n.. Create certificate") 57 58 # Before creating your certificate, let's create the management policy for your certificate. 59 # Here we use the default policy. 60 cert_name = "PrivateKeyCertificate" 61 cert_policy = CertificatePolicy.get_default() 62 63 # Awaiting create_certificate will return the certificate as a KeyVaultCertificate 64 # if creation is successful, and the CertificateOperation if not. 65 created_certificate = await certificate_client.create_certificate( 66 certificate_name=cert_name, policy=cert_policy 67 ) 68 print("Certificate with name '{}' was created".format(created_certificate.name)) 69 70 # Key Vault also creates a secret with the same name as the created certificate. 71 # This secret contains protected information about the certificate, such as its private key. 72 print("\n.. Get a secret by name") 73 certificate_secret = await secret_client.get_secret(name=cert_name) 74 print("Certificate secret with name '{}' was found.".format(certificate_secret.name)) 75 76 # Now we can extract the private key and public certificate from the secret using the cryptography 77 # package. `additional_certificates` will be empty since the secret only contains one certificate. 78 # This example shows how to parse a certificate in PKCS12 format since it's the default in Key Vault, 79 # but PEM certificates are supported as well. With a PEM certificate, you could use load_pem_private_key 80 # in place of load_key_and_certificates. 81 cert_bytes = base64.b64decode(certificate_secret.value) 82 private_key, public_certificate, additional_certificates = pkcs12.load_key_and_certificates( 83 data=cert_bytes, 84 password=None 85 ) 86 print("Certificate with name '{}' was parsed.".format(certificate_secret.name)) 87 88 # Now we can clean up the vault by deleting, then purging, the certificate. 89 print("\n.. Delete certificate") 90 deleted_certificate = await certificate_client.delete_certificate(certificate_name=cert_name) 91 print("Certificate with name '{}' was deleted.".format(deleted_certificate.name)) 92 93 await certificate_client.purge_deleted_certificate(certificate_name=deleted_certificate.name) 94 print("Certificate with name '{}' is being purged.".format(deleted_certificate.name)) 95 96 except HttpResponseError as e: 97 print("\nrun_sample has caught an error. {}".format(e.message)) 98 99 finally: 100 print("\nrun_sample done") 101 await credential.close() 102 await certificate_client.close() 103 await secret_client.close() 104 105 106if __name__ == "__main__": 107 try: 108 loop = asyncio.get_event_loop() 109 loop.run_until_complete(run_sample()) 110 loop.close() 111 112 except Exception as e: 113 print("Top level error: {}".format(str(e))) 114