1import time
2from azure.mgmt.resource import ResourceManagementClient
3from devtools_testutils import AzureMgmtTestCase
4from azure.mgmt.netapp.models import Backup, BackupPatch, VolumePatch
5from test_account import delete_account
6from test_volume import create_volume, wait_for_volume, delete_volume, delete_pool
7from setup import *
8import azure.mgmt.netapp.models
9import unittest
10
11backups = [TEST_BACKUP_1, TEST_BACKUP_2]
12
13
14def create_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1,
15                  volume_name=TEST_VOL_1, location=LOCATION, backup_only=False, live=False):
16    if not backup_only:
17        create_volume(client, rg, account_name, pool_name, volume_name, location, vnet=VNET, live=live)
18        wait_for_volume(client, rg, account_name, pool_name, volume_name, live)
19
20    vaults = client.vaults.list(rg, account_name)
21    volume_patch = VolumePatch(data_protection={
22        "backup": {
23            "vaultId": vaults.next().id,
24            "backupEnabled": True
25        }
26    })
27    client.volumes.begin_update(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, volume_patch).result()
28    backup_body = Backup(location=location)
29    backup = client.backups.begin_create(rg, account_name, pool_name, volume_name, backup_name, backup_body).result()
30    wait_for_backup_created(client, rg, account_name, pool_name, volume_name, backup_name, live)
31    return backup
32
33
34def disable_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1,
35                   volume_name=TEST_VOL_1, live=False):
36    vaults = client.vaults.list(rg, account_name)
37    volume_patch = VolumePatch(data_protection={
38        "backup": {
39            "vaultId": vaults.next().id,
40            "backupEnabled": False
41        }
42    })
43    client.volumes.begin_update(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, volume_patch).wait()
44    wait_for_no_backup(client, rg, account_name, pool_name, volume_name, backup_name, live)
45
46
47def delete_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1,
48                  volume_name=TEST_VOL_1, live=False):
49    client.backups.begin_delete(rg, account_name, pool_name, volume_name, backup_name).wait()
50    wait_for_no_backup(client, rg, account_name, pool_name, volume_name, backup_name, live)
51
52
53def get_backup(client, backup_name=TEST_BACKUP_1, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1, volume_name=TEST_VOL_1):
54    return client.backups.get(rg, account_name, pool_name, volume_name, backup_name)
55
56
57def get_backup_list(client, rg=TEST_RG, account_name=TEST_ACC_1, pool_name=TEST_POOL_1, volume_name=TEST_VOL_1):
58    return client.backups.list(rg, account_name, pool_name, volume_name)
59
60
61def wait_for_no_backup(client, rg, account_name, pool_name, volume_name, backup_name, live=False):
62    # a workaround for the async nature of certain ARM processes
63    co = 0
64    while co < 10:
65        co += 1
66        if live:
67            time.sleep(2)
68        try:
69            client.backups.get(rg, account_name, pool_name, volume_name, backup_name)
70        except:
71            # not found is an exception case (status code 200 expected)
72            # and is actually what we are waiting for
73            break
74
75def wait_for_backup_created(client, rg, account_name, pool_name, volume_name, backup_name, live=False):
76    co = 0
77    while co < 40:
78        co += 1
79        if live:
80            time.sleep(10)
81        backup = client.backups.get(rg, account_name, pool_name, volume_name, backup_name)
82        if backup.provisioning_state == "Succeeded":
83            break
84
85def clean_up(client, disable_bp=True, live=False):
86    if disable_bp:
87        disable_backup(client, live=live)
88
89    delete_volume(client, TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, live=live)
90    delete_pool(client, TEST_RG, TEST_ACC_1, TEST_POOL_1, live=live)
91    delete_account(client, TEST_RG, TEST_ACC_1, live=live)
92
93
94class NetAppAccountTestCase(AzureMgmtTestCase):
95    def setUp(self):
96        super(NetAppAccountTestCase, self).setUp()
97        self.client = self.create_mgmt_client(azure.mgmt.netapp.NetAppManagementClient)
98
99    # Before tests are run live a resource group needs to be created along with vnet and subnet
100    # Note that when tests are run in live mode it is best to run one test at a time.
101    def test_create_delete_backup(self):
102        # Create 2 backups since delete backups can only be used when volume has multiple backups
103        create_backup(self.client, live=self.is_live)
104        create_backup(self.client, backup_name=TEST_BACKUP_2, backup_only=True, live=self.is_live)
105        backup_list = get_backup_list(self.client)
106        self.assertEqual(len(list(backup_list)), 2)
107
108        # delete the older backup since we are not able to delete the newest one with delete backup service
109        delete_backup(self.client, live=self.is_live)
110        backup_list = get_backup_list(self.client)
111        self.assertEqual(len(list(backup_list)), 1)
112
113        # automaticaly delete the second backup by disable backups on volume
114        disable_backup(self.client, live=self.is_live)
115        backup_list = get_backup_list(self.client)
116        self.assertEqual(len(list(backup_list)), 0)
117
118        clean_up(self.client, disable_bp=False, live=self.is_live)
119
120    def test_list_backup(self):
121        create_backup(self.client, live=self.is_live)
122        create_backup(self.client, backup_name=TEST_BACKUP_2, backup_only=True, live=self.is_live)
123        backup_list = get_backup_list(self.client)
124        self.assertEqual(len(list(backup_list)), 2)
125        idx = 0
126        for backup in backup_list:
127            self.assertEqual(backup.name, backups[idx])
128            idx += 1
129
130        disable_backup(self.client, live=self.is_live)
131        disable_backup(self.client, backup_name=TEST_BACKUP_2, live=self.is_live)
132
133        backup_list = get_backup_list(self.client)
134        self.assertEqual(len(list(backup_list)), 0)
135
136        clean_up(self.client, disable_bp=False, live=self.is_live)
137
138    def test_get_backup_by_name(self):
139        create_backup(self.client, live=self.is_live)
140
141        backup = get_backup(self.client, TEST_BACKUP_1)
142        self.assertEqual(backup.name, TEST_ACC_1 + "/" + TEST_POOL_1 + "/" + TEST_VOL_1 + "/" + TEST_BACKUP_1)
143
144        clean_up(self.client, live=self.is_live)
145
146
147    @unittest.skip("Skipping test until able to update anyting")
148    def test_update_backup(self):
149        create_backup(self.client, live=self.is_live)
150
151        tag = {'Tag1': 'Value1'}
152        backup_body = BackupPatch(location=LOCATION, tags=tag)
153        self.client.backups.begin_update(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1, TEST_BACKUP_1, backup_body).wait()
154
155        backup = get_backup(self.client)
156        self.assertTrue(backup.tags['Tag1'] == 'Value1')
157
158        clean_up(self.client, live=self.is_live)
159
160
161    @unittest.skip("Skipping test until this feature has faster performance. Today you have to wait 5 minute after backup creation until you can call backup status")
162    def test_get_backup_status(self):
163        create_backup(self.client, live=self.is_live)
164
165        backup_status = self.client.backups.get_status(TEST_RG, TEST_ACC_1, TEST_POOL_1, TEST_VOL_1)
166        self.assertTrue(backup_status.healthy)
167        self.assertEqual(backup_status.mirrorState, "Mirrored")
168
169        clean_up(self.client, live=self.is_live)
170