1import time 2from azure.mgmt.resource import ResourceManagementClient 3from devtools_testutils import AzureMgmtTestCase 4from azure.mgmt.netapp.models import Backup, BackupPatch, VolumePatch 5from test_account import delete_account 6from test_volume import create_volume, wait_for_volume, delete_volume, delete_pool 7from setup import * 8import azure.mgmt.netapp.models 9import unittest 10 11backups = [TEST_BACKUP_1, TEST_BACKUP_2] 12 13 14def create_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1, 15 volume_name=TEST_VOL_1, location=LOCATION, backup_only=False, live=False): 16 if not backup_only: 17 create_volume(client, rg, account_name, pool_name, volume_name, location, vnet=VNET, live=live) 18 wait_for_volume(client, rg, account_name, pool_name, volume_name, live) 19 20 vaults = client.vaults.list(rg, account_name) 21 volume_patch = VolumePatch(data_protection={ 22 "backup": { 23 "vaultId": vaults.next().id, 24 "backupEnabled": True 25 } 26 }) 27 client.volumes.begin_update(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, volume_patch).result() 28 backup_body = Backup(location=location) 29 backup = client.backups.begin_create(rg, account_name, pool_name, volume_name, backup_name, backup_body).result() 30 wait_for_backup_created(client, rg, account_name, pool_name, volume_name, backup_name, live) 31 return backup 32 33 34def disable_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1, 35 volume_name=TEST_VOL_1, live=False): 36 vaults = client.vaults.list(rg, account_name) 37 volume_patch = VolumePatch(data_protection={ 38 "backup": { 39 "vaultId": vaults.next().id, 40 "backupEnabled": False 41 } 42 }) 43 client.volumes.begin_update(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, volume_patch).wait() 44 wait_for_no_backup(client, rg, account_name, pool_name, volume_name, backup_name, live) 45 46 47def delete_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1, 48 volume_name=TEST_VOL_1, live=False): 49 client.backups.begin_delete(rg, account_name, pool_name, volume_name, backup_name).wait() 50 wait_for_no_backup(client, rg, account_name, pool_name, volume_name, backup_name, live) 51 52 53def get_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1, volume_name=TEST_VOL_1): 54 return client.backups.get(rg, account_name, pool_name, volume_name, backup_name) 55 56 57def get_backup_list(client, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1, volume_name=TEST_VOL_1): 58 return client.backups.list(rg, account_name, pool_name, volume_name) 59 60 61def wait_for_no_backup(client, rg, account_name, pool_name, volume_name, backup_name, live=False): 62 # a workaround for the async nature of certain ARM processes 63 co = 0 64 while co < 10: 65 co += 1 66 if live: 67 time.sleep(2) 68 try: 69 client.backups.get(rg, account_name, pool_name, volume_name, backup_name) 70 except: 71 # not found is an exception case (status code 200 expected) 72 # and is actually what we are waiting for 73 break 74 75def wait_for_backup_created(client, rg, account_name, pool_name, volume_name, backup_name, live=False): 76 co = 0 77 while co < 40: 78 co += 1 79 if live: 80 time.sleep(10) 81 backup = client.backups.get(rg, account_name, pool_name, volume_name, backup_name) 82 if backup.provisioning_state == "Succeeded": 83 break 84 85def clean_up(client, disable_bp=True, live=False): 86 if disable_bp: 87 disable_backup(client, live=live) 88 89 delete_volume(client, TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, live=live) 90 delete_pool(client, TEST_RG, TEST_ACC_1, TEST_POOL_1, live=live) 91 delete_account(client, TEST_RG, TEST_ACC_1, live=live) 92 93 94class NetAppAccountTestCase(AzureMgmtTestCase): 95 def setUp(self): 96 super(NetAppAccountTestCase, self).setUp() 97 self.client = self.create_mgmt_client(azure.mgmt.netapp.NetAppManagementClient) 98 99 # Before tests are run live a resource group needs to be created along with vnet and subnet 100 # Note that when tests are run in live mode it is best to run one test at a time. 101 def test_create_delete_backup(self): 102 # Create 2 backups since delete backups can only be used when volume has multiple backups 103 create_backup(self.client, live=self.is_live) 104 create_backup(self.client, backup_name=TEST_BACKUP_2, backup_only=True, live=self.is_live) 105 backup_list = get_backup_list(self.client) 106 self.assertEqual(len(list(backup_list)), 2) 107 108 # delete the older backup since we are not able to delete the newest one with delete backup service 109 delete_backup(self.client, live=self.is_live) 110 backup_list = get_backup_list(self.client) 111 self.assertEqual(len(list(backup_list)), 1) 112 113 # automaticaly delete the second backup by disable backups on volume 114 disable_backup(self.client, live=self.is_live) 115 backup_list = get_backup_list(self.client) 116 self.assertEqual(len(list(backup_list)), 0) 117 118 clean_up(self.client, disable_bp=False, live=self.is_live) 119 120 def test_list_backup(self): 121 create_backup(self.client, live=self.is_live) 122 create_backup(self.client, backup_name=TEST_BACKUP_2, backup_only=True, live=self.is_live) 123 backup_list = get_backup_list(self.client) 124 self.assertEqual(len(list(backup_list)), 2) 125 idx = 0 126 for backup in backup_list: 127 self.assertEqual(backup.name, backups[idx]) 128 idx += 1 129 130 disable_backup(self.client, live=self.is_live) 131 disable_backup(self.client, backup_name=TEST_BACKUP_2, live=self.is_live) 132 133 backup_list = get_backup_list(self.client) 134 self.assertEqual(len(list(backup_list)), 0) 135 136 clean_up(self.client, disable_bp=False, live=self.is_live) 137 138 def test_get_backup_by_name(self): 139 create_backup(self.client, live=self.is_live) 140 141 backup = get_backup(self.client, TEST_BACKUP_1) 142 self.assertEqual(backup.name, TEST_ACC_1 + "/" + TEST_POOL_1 + "/" + TEST_VOL_1 + "/" + TEST_BACKUP_1) 143 144 clean_up(self.client, live=self.is_live) 145 146 147 @unittest.skip("Skipping test until able to update anyting") 148 def test_update_backup(self): 149 create_backup(self.client, live=self.is_live) 150 151 tag = {'Tag1': 'Value1'} 152 backup_body = BackupPatch(location=LOCATION, tags=tag) 153 self.client.backups.begin_update(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, TEST_BACKUP_1, backup_body).wait() 154 155 backup = get_backup(self.client) 156 self.assertTrue(backup.tags['Tag1'] == 'Value1') 157 158 clean_up(self.client, live=self.is_live) 159 160 161 @unittest.skip("Skipping test until this feature has faster performance. Today you have to wait 5 minute after backup creation until you can call backup status") 162 def test_get_backup_status(self): 163 create_backup(self.client, live=self.is_live) 164 165 backup_status = self.client.backups.get_status(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1) 166 self.assertTrue(backup_status.healthy) 167 self.assertEqual(backup_status.mirrorState, "Mirrored") 168 169 clean_up(self.client, live=self.is_live) 170