1# ------------------------------------ 2# Copyright (c) Microsoft Corporation. 3# Licensed under the MIT License. 4# ------------------------------------ 5import asyncio 6import os 7from azure.keyvault.certificates import CertificatePolicy 8from azure.keyvault.certificates.aio import CertificateClient 9from azure.identity.aio import DefaultAzureCredential 10from azure.core.exceptions import HttpResponseError 11 12# ---------------------------------------------------------------------------------------------------------- 13# Prerequisites: 14# 1. An Azure Key Vault (https://docs.microsoft.com/en-us/azure/key-vault/quick-create-cli) 15# 16# 2. azure-keyvault-certificates and azure-identity packages (pip install these) 17# 18# 3. Set Environment variables AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET, VAULT_URL 19# (See https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/keyvault/azure-keyvault-keys#authenticate-the-client) 20# 21# ---------------------------------------------------------------------------------------------------------- 22# Sample - demonstrates the basic recover and purge operations on a vault(certificate) resource for Azure Key Vault 23# 24# 1. Create a certificate (create_certificate) 25# 26# 2. Delete a certificate (delete_certificate) 27# 28# 3. Recover a deleted certificate (recover_deleted_certificate) 29# 30# 4. Purge a deleted certificate (purge_deleted_certificate) 31# ---------------------------------------------------------------------------------------------------------- 32 33 34async def run_sample(): 35 # Instantiate a certificate client that will be used to call the service. 36 # Notice that the client is using default Azure credentials. 37 # To make default credentials work, ensure that environment variables 'AZURE_CLIENT_ID', 38 # 'AZURE_CLIENT_SECRET' and 'AZURE_TENANT_ID' are set with the service principal credentials. 39 VAULT_URL = os.environ["VAULT_URL"] 40 credential = DefaultAzureCredential() 41 client = CertificateClient(vault_url=VAULT_URL, credential=credential) 42 try: 43 # Let's create certificates holding storage and bank accounts credentials. If the certificate 44 # already exists in the Key Vault, then a new version of the certificate is created. 45 print("\n.. Create Certificates") 46 bank_cert_name = "BankRecoverCertificate" 47 storage_cert_name = "ServerRecoverCertificate" 48 49 bank_certificate = await client.create_certificate( 50 certificate_name=bank_cert_name, policy=CertificatePolicy.get_default() 51 ) 52 storage_certificate = await client.create_certificate( 53 certificate_name=storage_cert_name, policy=CertificatePolicy.get_default() 54 ) 55 56 print("Certificate with name '{0}' was created.".format(bank_certificate.name)) 57 print("Certificate with name '{0}' was created.".format(storage_certificate.name)) 58 59 # The storage account was closed, need to delete its credentials from the Key Vault. 60 print("\n.. Delete a Certificate") 61 deleted_bank_certificate = await client.delete_certificate(bank_cert_name) 62 # To ensure certificate is deleted on the server side. 63 await asyncio.sleep(30) 64 65 print( 66 "Certificate with name '{0}' was deleted on date {1}.".format( 67 deleted_bank_certificate.name, deleted_bank_certificate.deleted_on 68 ) 69 ) 70 71 # We accidentally deleted the bank account certificate. Let's recover it. 72 # A deleted certificate can only be recovered if the Key Vault is soft-delete enabled. 73 print("\n.. Recover Deleted Certificate") 74 recovered_bank_certificate = await client.recover_deleted_certificate(deleted_bank_certificate.name) 75 print("Recovered Certificate with name '{0}'.".format(recovered_bank_certificate.name)) 76 77 # Let's delete storage account now. 78 # If the keyvault is soft-delete enabled, then for permanent deletion deleted certificate needs to be purged. 79 await client.delete_certificate(storage_cert_name) 80 81 # Certificates will still purge eventually on their scheduled purge date, but calling `purge_deleted_certificate` immediately 82 # purges. 83 print("\n.. Purge Deleted Certificate") 84 await client.purge_deleted_certificate(storage_cert_name) 85 print("Certificate has been permanently deleted.") 86 87 except HttpResponseError as e: 88 if "(NotSupported)" in e.message: 89 print("\n{0} Please enable soft delete on Key Vault to perform this operation.".format(e.message)) 90 else: 91 print("\nrun_sample has caught an error. {0}".format(e.message)) 92 93 finally: 94 print("\nrun_sample done") 95 await credential.close() 96 await client.close() 97 98 99if __name__ == "__main__": 100 try: 101 loop = asyncio.get_event_loop() 102 loop.run_until_complete(run_sample()) 103 loop.close() 104 105 except Exception as e: 106 print("Top level Error: {0}".format(str(e))) 107